diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index 04c925fbd7..f482194355 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -162,6 +162,9 @@ func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob) (map[interf for i := int64(0); i < dData.RowCount; i++ { pk := dData.Pks[i] ts := dData.Tss[i] + if lastTS, ok := pk2ts[pk.GetValue()]; ok && lastTS > ts { + ts = lastTS + } pk2ts[pk.GetValue()] = ts } diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index e224934e66..37274a80e4 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -197,6 +197,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) { 4, 5, 1, + 2, }, []Timestamp{ 20000, @@ -205,6 +206,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) { 30000, 50000, 50000, + 10000, }) require.NoError(t, err) @@ -232,6 +234,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) { if test.isvalid { assert.NoError(t, err) assert.Equal(t, 5, len(pk2ts)) + assert.EqualValues(t, 20001, pk2ts[UniqueID(2)]) } else { assert.Error(t, err) assert.Nil(t, pk2ts)