diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index 93483c6f8c..fb6a259444 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -157,7 +157,9 @@ func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob) (map[interf for i := int64(0); i < dData.RowCount; i++ { pk := dData.Pks[i] ts := dData.Tss[i] - + if lastTS, ok := pk2ts[pk.GetValue()]; ok && lastTS > ts { + ts = lastTS + } pk2ts[pk.GetValue()] = ts } } diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index aa965093c4..a2682e2be0 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -178,6 +178,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) { 4, 5, 1, + 2, }, []Timestamp{ 20000, @@ -186,6 +187,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) { 30000, 50000, 50000, + 10000, }) require.NoError(t, err) @@ -213,6 +215,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) { if test.isvalid { assert.NoError(t, err) assert.Equal(t, 5, len(pk2ts)) + assert.EqualValues(t, 20001, pk2ts[UniqueID(2)]) } else { assert.Error(t, err) assert.Nil(t, pk2ts)