diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index c208499c25..6b966d0f84 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -1091,10 +1091,11 @@ func (m *meta) copyDeltaFiles(binlogs []*datapb.FieldBinlog, collectionID, parti } func (m *meta) alterMetaStoreAfterCompaction(segmentCompactTo *SegmentInfo, segmentsCompactFrom []*SegmentInfo) error { - modInfos := make([]*datapb.SegmentInfo, len(segmentsCompactFrom)) - for i := range segmentsCompactFrom { - modInfos[i] = segmentsCompactFrom[i].SegmentInfo + modInfos := make([]*datapb.SegmentInfo, 0, len(segmentsCompactFrom)) + for _, segment := range segmentsCompactFrom { + modInfos = append(modInfos, segment.SegmentInfo) } + newSegment := segmentCompactTo.SegmentInfo modSegIDs := lo.Map(modInfos, func(segment *datapb.SegmentInfo, _ int) int64 { return segment.GetID() }) @@ -1110,7 +1111,12 @@ func (m *meta) alterMetaStoreAfterCompaction(segmentCompactTo *SegmentInfo, segm zap.Int("delta logs", len(newSegment.GetDeltalogs())), zap.Int64("compact to segment", newSegment.GetID())) - err := m.catalog.AlterSegmentsAndAddNewSegment(m.ctx, modInfos, newSegment) + err := m.catalog.AlterSegments(m.ctx, append(modInfos, newSegment), metastore.BinlogsIncrement{ + Segment: newSegment, + Insertlogs: newSegment.GetBinlogs(), + Deltalogs: newSegment.GetDeltalogs(), + Statslogs: newSegment.GetStatslogs(), + }) if err != nil { log.Warn("fail to alter segments and new segment", zap.Error(err)) return err diff --git a/internal/metastore/kv/datacoord/kv_catalog.go b/internal/metastore/kv/datacoord/kv_catalog.go index 61f473f136..9fc3611391 100644 --- a/internal/metastore/kv/datacoord/kv_catalog.go +++ b/internal/metastore/kv/datacoord/kv_catalog.go @@ -254,12 +254,12 @@ func (kc *Catalog) LoadFromSegmentPath(colID, partID, segID typeutil.UniqueID) ( return segInfo, nil } -func (kc *Catalog) AlterSegments(ctx context.Context, newSegments []*datapb.SegmentInfo, binlogs ...metastore.BinlogsIncrement) error { - if len(newSegments) == 0 { +func (kc *Catalog) AlterSegments(ctx context.Context, segments []*datapb.SegmentInfo, binlogs ...metastore.BinlogsIncrement) error { + if len(segments) == 0 { return nil } kvs := make(map[string]string) - for _, segment := range newSegments { + for _, segment := range segments { kc.collectMetrics(segment) // we don't persist binlog fields, but instead store binlogs as independent kvs @@ -271,6 +271,14 @@ func (kc *Catalog) AlterSegments(ctx context.Context, newSegments []*datapb.Segm cloned.NumOfRows = rowCount } + if segment.GetState() == commonpb.SegmentState_Dropped { + binlogs, err := kc.handleDroppedSegment(segment) + if err != nil { + return err + } + maps.Copy(kvs, binlogs) + } + k, v, err := buildSegmentKv(cloned) if err != nil { return err @@ -288,6 +296,26 @@ func (kc *Catalog) AlterSegments(ctx context.Context, newSegments []*datapb.Segm maps.Copy(kvs, binlogKvs) } + return kc.SaveByBatch(kvs) +} + +func (kc *Catalog) handleDroppedSegment(segment *datapb.SegmentInfo) (kvs map[string]string, err error) { + var has bool + has, err = kc.hasBinlogPrefix(segment) + if err != nil { + return + } + // To be compatible with previous implementation, we have to write binlogs on etcd for correct gc. + if !has { + kvs, err = buildBinlogKvsWithLogID(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID(), cloneLogs(segment.GetBinlogs()), cloneLogs(segment.GetDeltalogs()), cloneLogs(segment.GetStatslogs()), true) + if err != nil { + return + } + } + return +} + +func (kc *Catalog) SaveByBatch(kvs map[string]string) error { saveFn := func(partialKvs map[string]string) error { return kc.MetaKv.MultiSave(partialKvs) }