From bccdef1ad7157328c0227b640c0feee0576ef32e Mon Sep 17 00:00:00 2001 From: Jiquan Long Date: Wed, 19 Jul 2023 15:28:57 +0800 Subject: [PATCH] Fix segment meta only record last flushed binlog (#25707) Signed-off-by: longjiquan --- internal/datacoord/meta.go | 10 ++-------- internal/metastore/catalog.go | 5 +---- internal/metastore/kv/datacoord/kv_catalog.go | 5 +++-- .../metastore/kv/datacoord/kv_catalog_test.go | 15 +++------------ 4 files changed, 9 insertions(+), 26 deletions(-) diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 96e8749632..000deea09f 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -590,10 +590,7 @@ func (m *meta) UpdateFlushSegmentsInfo( } if err := m.catalog.AlterSegments(m.ctx, segments, metastore.BinlogsIncrement{ - Segment: clonedSegment.SegmentInfo, - Insertlogs: binlogs, - Statslogs: statslogs, - Deltalogs: deltalogs, + Segment: clonedSegment.SegmentInfo, }); err != nil { log.Error("meta update: update flush segments info - failed to store flush segment info into Etcd", zap.Error(err)) @@ -1107,10 +1104,7 @@ func (m *meta) alterMetaStoreAfterCompaction(segmentCompactTo *SegmentInfo, segm zap.Int64("compact to segment", newSegment.GetID())) err := m.catalog.AlterSegments(m.ctx, append(modInfos, newSegment), metastore.BinlogsIncrement{ - Segment: newSegment, - Insertlogs: newSegment.GetBinlogs(), - Deltalogs: newSegment.GetDeltalogs(), - Statslogs: newSegment.GetStatslogs(), + Segment: newSegment, }) if err != nil { log.Warn("fail to alter segments and new segment", zap.Error(err)) diff --git a/internal/metastore/catalog.go b/internal/metastore/catalog.go index c7eb98f633..eddb466848 100644 --- a/internal/metastore/catalog.go +++ b/internal/metastore/catalog.go @@ -103,10 +103,7 @@ func (t AlterType) String() string { } type BinlogsIncrement struct { - Segment *datapb.SegmentInfo - Insertlogs []*datapb.FieldBinlog - Statslogs []*datapb.FieldBinlog - Deltalogs []*datapb.FieldBinlog + Segment *datapb.SegmentInfo } //go:generate mockery --name=DataCoordCatalog --with-expecter diff --git a/internal/metastore/kv/datacoord/kv_catalog.go b/internal/metastore/kv/datacoord/kv_catalog.go index 2436b5b293..0e44f4c59a 100644 --- a/internal/metastore/kv/datacoord/kv_catalog.go +++ b/internal/metastore/kv/datacoord/kv_catalog.go @@ -288,8 +288,9 @@ func (kc *Catalog) AlterSegments(ctx context.Context, segments []*datapb.Segment for _, b := range binlogs { segment := b.Segment - binlogKvs, err := buildBinlogKvsWithLogID(segment.GetCollectionID(), segment.GetPartitionID(), - segment.GetID(), cloneLogs(b.Insertlogs), cloneLogs(b.Deltalogs), cloneLogs(b.Statslogs), len(segment.GetCompactionFrom()) > 0) + binlogKvs, err := buildBinlogKvsWithLogID(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID(), + cloneLogs(segment.GetBinlogs()), cloneLogs(segment.GetDeltalogs()), cloneLogs(segment.GetStatslogs()), + len(segment.GetCompactionFrom()) > 0) if err != nil { return err } diff --git a/internal/metastore/kv/datacoord/kv_catalog_test.go b/internal/metastore/kv/datacoord/kv_catalog_test.go index d8e2cadb3e..329bc8016a 100644 --- a/internal/metastore/kv/datacoord/kv_catalog_test.go +++ b/internal/metastore/kv/datacoord/kv_catalog_test.go @@ -329,10 +329,7 @@ func Test_AlterSegments(t *testing.T) { catalog := NewCatalog(metakv, rootPath, "") assert.Panics(t, func() { catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{invalidSegment}, metastore.BinlogsIncrement{ - Segment: invalidSegment, - Insertlogs: invalidSegment.Binlogs, - Statslogs: invalidSegment.Statslogs, - Deltalogs: invalidSegment.Deltalogs, + Segment: invalidSegment, }) }) }) @@ -360,10 +357,7 @@ func Test_AlterSegments(t *testing.T) { assert.NoError(t, err) err = catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{segment1}, metastore.BinlogsIncrement{ - Segment: segment1, - Insertlogs: segment1.Binlogs, - Statslogs: segment1.Statslogs, - Deltalogs: segment1.Deltalogs, + Segment: segment1, }) assert.NoError(t, err) @@ -423,10 +417,7 @@ func Test_AlterSegments(t *testing.T) { err = catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{segmentXL}, metastore.BinlogsIncrement{ - Segment: segmentXL, - Insertlogs: segmentXL.Binlogs, - Statslogs: segmentXL.Statslogs, - Deltalogs: segmentXL.Deltalogs, + Segment: segmentXL, }) assert.NoError(t, err) assert.Equal(t, 255+3, len(savedKvs))