From 7bd9eceb153975ae5aafb6bfbbc4c7b5c1343db7 Mon Sep 17 00:00:00 2001 From: Filip Haltmayer <81822489+filip-halt@users.noreply.github.com> Date: Thu, 27 Jul 2023 19:13:03 -0700 Subject: [PATCH] Remove unused KV functions (#25972) Remove RevertAlterSegmentsAndAddNewSegment Remove AlterIndex Remove AlterSegmentsAndAddNewSegment Remove AlterSegmentIndex Remove IndexCoordCatalog Signed-off-by: Filip Haltmayer --- internal/metastore/catalog.go | 18 --- internal/metastore/kv/datacoord/kv_catalog.go | 86 ------------ .../metastore/kv/datacoord/kv_catalog_test.go | 129 ------------------ 3 files changed, 233 deletions(-) diff --git a/internal/metastore/catalog.go b/internal/metastore/catalog.go index eddb466848..ce7732b842 100644 --- a/internal/metastore/catalog.go +++ b/internal/metastore/catalog.go @@ -112,8 +112,6 @@ type DataCoordCatalog interface { AddSegment(ctx context.Context, segment *datapb.SegmentInfo) error // TODO Remove this later, we should update flush segments info for each segment separately, so far we still need transaction AlterSegments(ctx context.Context, newSegments []*datapb.SegmentInfo, binlogs ...BinlogsIncrement) error - // AlterSegmentsAndAddNewSegment for transaction - AlterSegmentsAndAddNewSegment(ctx context.Context, segments []*datapb.SegmentInfo, newSegment *datapb.SegmentInfo) error SaveDroppedSegmentsInBatch(ctx context.Context, segments []*datapb.SegmentInfo) error DropSegment(ctx context.Context, segment *datapb.SegmentInfo) error @@ -129,33 +127,17 @@ type DataCoordCatalog interface { CreateIndex(ctx context.Context, index *model.Index) error ListIndexes(ctx context.Context) ([]*model.Index, error) - AlterIndex(ctx context.Context, newIndex *model.Index) error AlterIndexes(ctx context.Context, newIndexes []*model.Index) error DropIndex(ctx context.Context, collID, dropIdxID typeutil.UniqueID) error CreateSegmentIndex(ctx context.Context, segIdx *model.SegmentIndex) error ListSegmentIndexes(ctx context.Context) ([]*model.SegmentIndex, error) - AlterSegmentIndex(ctx context.Context, newSegIndex *model.SegmentIndex) error AlterSegmentIndexes(ctx context.Context, newSegIdxes []*model.SegmentIndex) error DropSegmentIndex(ctx context.Context, collID, partID, segID, buildID typeutil.UniqueID) error GcConfirm(ctx context.Context, collectionID, partitionID typeutil.UniqueID) bool } -type IndexCoordCatalog interface { - CreateIndex(ctx context.Context, index *model.Index) error - ListIndexes(ctx context.Context) ([]*model.Index, error) - AlterIndex(ctx context.Context, newIndex *model.Index) error - AlterIndexes(ctx context.Context, newIndexes []*model.Index) error - DropIndex(ctx context.Context, collID, dropIdxID typeutil.UniqueID) error - - CreateSegmentIndex(ctx context.Context, segIdx *model.SegmentIndex) error - ListSegmentIndexes(ctx context.Context) ([]*model.SegmentIndex, error) - AlterSegmentIndex(ctx context.Context, newSegIndex *model.SegmentIndex) error - AlterSegmentIndexes(ctx context.Context, newSegIdxes []*model.SegmentIndex) error - DropSegmentIndex(ctx context.Context, collID, partID, segID, buildID typeutil.UniqueID) error -} - type QueryCoordCatalog interface { SaveCollection(collection *querypb.CollectionLoadInfo, partitions ...*querypb.PartitionLoadInfo) error SavePartition(info ...*querypb.PartitionLoadInfo) error diff --git a/internal/metastore/kv/datacoord/kv_catalog.go b/internal/metastore/kv/datacoord/kv_catalog.go index 0e44f4c59a..5470467102 100644 --- a/internal/metastore/kv/datacoord/kv_catalog.go +++ b/internal/metastore/kv/datacoord/kv_catalog.go @@ -406,84 +406,6 @@ func (kc *Catalog) hasBinlogPrefix(segment *datapb.SegmentInfo) (bool, error) { return hasBinlogPrefix || hasDeltaPrefix || hasStatsPrefix, nil } -func (kc *Catalog) AlterSegmentsAndAddNewSegment(ctx context.Context, segments []*datapb.SegmentInfo, newSegment *datapb.SegmentInfo) error { - kvs := make(map[string]string) - - for _, s := range segments { - noBinlogsSegment, binlogs, deltalogs, statslogs := CloneSegmentWithExcludeBinlogs(s) - // `s` is not mutated above. Also, `noBinlogsSegment` is a cloned version of `s`. - segmentutil.ReCalcRowCount(s, noBinlogsSegment) - // for compacted segments - if noBinlogsSegment.State == commonpb.SegmentState_Dropped { - hasBinlogkeys, err := kc.hasBinlogPrefix(s) - if err != nil { - return err - } - - // In order to guarantee back compatibility, the old format segments need - // convert to new format that include segment key and three binlog keys, - // or GC can not find data path on the storage. - if !hasBinlogkeys { - binlogsKvs, err := buildBinlogKvsWithLogID(noBinlogsSegment.CollectionID, noBinlogsSegment.PartitionID, noBinlogsSegment.ID, binlogs, deltalogs, statslogs, true) - if err != nil { - return err - } - maps.Copy(kvs, binlogsKvs) - } - } - - k, v, err := buildSegmentKv(noBinlogsSegment) - if err != nil { - return err - } - kvs[k] = v - } - - if newSegment != nil { - segmentKvs, err := buildSegmentAndBinlogsKvs(newSegment) - if err != nil { - return err - } - maps.Copy(kvs, segmentKvs) - if newSegment.NumOfRows > 0 { - kc.collectMetrics(newSegment) - } - } - return kc.MetaKv.MultiSave(kvs) -} - -// RevertAlterSegmentsAndAddNewSegment reverts the metastore operation of AlterSegmentsAndAddNewSegment -func (kc *Catalog) RevertAlterSegmentsAndAddNewSegment(ctx context.Context, oldSegments []*datapb.SegmentInfo, removeSegment *datapb.SegmentInfo) error { - var ( - kvs = make(map[string]string) - removals []string - ) - - for _, s := range oldSegments { - segmentKvs, err := buildSegmentAndBinlogsKvs(s) - if err != nil { - return err - } - - maps.Copy(kvs, segmentKvs) - } - - if removeSegment != nil { - segKey := buildSegmentPath(removeSegment.GetCollectionID(), removeSegment.GetPartitionID(), removeSegment.GetID()) - removals = append(removals, segKey) - binlogKeys := buildBinlogKeys(removeSegment) - removals = append(removals, binlogKeys...) - } - - err := kc.MetaKv.MultiSaveAndRemove(kvs, removals) - if err != nil { - log.Warn("batch save and remove segments failed", zap.Error(err)) - return err - } - - return nil -} - func (kc *Catalog) SaveDroppedSegmentsInBatch(ctx context.Context, segments []*datapb.SegmentInfo) error { if len(segments) == 0 { return nil @@ -685,10 +607,6 @@ func (kc *Catalog) ListIndexes(ctx context.Context) ([]*model.Index, error) { return indexes, nil } -func (kc *Catalog) AlterIndex(ctx context.Context, index *model.Index) error { - return kc.CreateIndex(ctx, index) -} - func (kc *Catalog) AlterIndexes(ctx context.Context, indexes []*model.Index) error { kvs := make(map[string]string) for _, index := range indexes { @@ -755,10 +673,6 @@ func (kc *Catalog) ListSegmentIndexes(ctx context.Context) ([]*model.SegmentInde return segIndexes, nil } -func (kc *Catalog) AlterSegmentIndex(ctx context.Context, segIdx *model.SegmentIndex) error { - return kc.CreateSegmentIndex(ctx, segIdx) -} - func (kc *Catalog) AlterSegmentIndexes(ctx context.Context, segIdxes []*model.SegmentIndex) error { kvs := make(map[string]string) for _, segIdx := range segIdxes { diff --git a/internal/metastore/kv/datacoord/kv_catalog_test.go b/internal/metastore/kv/datacoord/kv_catalog_test.go index 329bc8016a..759d542cec 100644 --- a/internal/metastore/kv/datacoord/kv_catalog_test.go +++ b/internal/metastore/kv/datacoord/kv_catalog_test.go @@ -431,62 +431,6 @@ func Test_AlterSegments(t *testing.T) { }) } -func Test_AlterSegmentsAndAddNewSegment(t *testing.T) { - t.Run("save error", func(t *testing.T) { - metakv := mocks.NewMetaKv(t) - metakv.EXPECT().MultiSave(mock.Anything).Return(errors.New("error")) - - catalog := NewCatalog(metakv, rootPath, "") - err := catalog.AlterSegmentsAndAddNewSegment(context.TODO(), []*datapb.SegmentInfo{}, segment1) - assert.Error(t, err) - }) - - t.Run("get prefix fail", func(t *testing.T) { - metakv := mocks.NewMetaKv(t) - metakv.EXPECT().HasPrefix(mock.Anything).Return(false, errors.New("error")) - - catalog := NewCatalog(metakv, rootPath, "") - err := catalog.AlterSegmentsAndAddNewSegment(context.TODO(), []*datapb.SegmentInfo{droppedSegment}, nil) - assert.Error(t, err) - }) - - t.Run("save successfully", func(t *testing.T) { - savedKvs := make(map[string]string, 0) - metakv := mocks.NewMetaKv(t) - metakv.EXPECT().MultiSave(mock.Anything).RunAndReturn(func(m map[string]string) error { - maps.Copy(savedKvs, m) - return nil - }) - metakv.EXPECT().Load(mock.Anything).RunAndReturn(func(s string) (string, error) { - if v, ok := savedKvs[s]; ok { - return v, nil - } - return "", errors.New("key not found") - }) - metakv.EXPECT().HasPrefix(mock.Anything).Return(false, nil) - - catalog := NewCatalog(metakv, rootPath, "") - err := catalog.AlterSegmentsAndAddNewSegment(context.TODO(), []*datapb.SegmentInfo{droppedSegment}, segment1) - assert.NoError(t, err) - - assert.Equal(t, 8, len(savedKvs)) - verifySavedKvsForDroppedSegment(t, savedKvs) - verifySavedKvsForSegment(t, savedKvs) - - adjustedSeg, err := catalog.LoadFromSegmentPath(droppedSegment.CollectionID, droppedSegment.PartitionID, droppedSegment.ID) - assert.NoError(t, err) - // Check that num of rows is corrected from 100 to 5. - assert.Equal(t, int64(100), droppedSegment.GetNumOfRows()) - assert.Equal(t, int64(5), adjustedSeg.GetNumOfRows()) - - adjustedSeg, err = catalog.LoadFromSegmentPath(segment1.CollectionID, segment1.PartitionID, segment1.ID) - assert.NoError(t, err) - // Check that num of rows is corrected from 100 to 5. - assert.Equal(t, int64(100), droppedSegment.GetNumOfRows()) - assert.Equal(t, int64(5), adjustedSeg.GetNumOfRows()) - }) -} - func Test_DropSegment(t *testing.T) { t.Run("remove failed", func(t *testing.T) { metakv := mocks.NewMetaKv(t) @@ -595,25 +539,6 @@ func Test_SaveDroppedSegmentsInBatch_MultiSave(t *testing.T) { } } -func TestCatalog_RevertAlterSegmentsAndAddNewSegment(t *testing.T) { - t.Run("save error", func(t *testing.T) { - txn := mocks.NewMetaKv(t) - txn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything).Return(errors.New("mock error")) - - catalog := NewCatalog(txn, rootPath, "") - err := catalog.RevertAlterSegmentsAndAddNewSegment(context.TODO(), []*datapb.SegmentInfo{segment1}, droppedSegment) - assert.Error(t, err) - }) - - t.Run("revert successfully", func(t *testing.T) { - txn := mocks.NewMetaKv(t) - txn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything).Return(nil) - catalog := NewCatalog(txn, rootPath, "") - err := catalog.RevertAlterSegmentsAndAddNewSegment(context.TODO(), []*datapb.SegmentInfo{segment1}, droppedSegment) - assert.NoError(t, err) - }) -} - func TestChannelCP(t *testing.T) { mockVChannel := "fake-by-dev-rootcoord-dml-1-testchannelcp-v0" mockPChannel := "fake-by-dev-rootcoord-dml-1" @@ -907,30 +832,6 @@ func TestCatalog_ListIndexes(t *testing.T) { }) } -func TestCatalog_AlterIndex(t *testing.T) { - i := &model.Index{ - CollectionID: 0, - FieldID: 0, - IndexID: 0, - IndexName: "", - IsDeleted: false, - CreateTime: 0, - TypeParams: nil, - IndexParams: nil, - } - t.Run("add", func(t *testing.T) { - txn := mocks.NewMetaKv(t) - txn.EXPECT().Save(mock.Anything, mock.Anything).Return(nil) - - catalog := &Catalog{ - MetaKv: txn, - } - - err := catalog.AlterIndex(context.Background(), i) - assert.NoError(t, err) - }) -} - func TestCatalog_AlterIndexes(t *testing.T) { i := &model.Index{ CollectionID: 0, @@ -1073,36 +974,6 @@ func TestCatalog_ListSegmentIndexes(t *testing.T) { }) } -func TestCatalog_AlterSegmentIndex(t *testing.T) { - segIdx := &model.SegmentIndex{ - SegmentID: 0, - CollectionID: 0, - PartitionID: 0, - NumRows: 0, - IndexID: 0, - BuildID: 0, - NodeID: 0, - IndexState: 0, - FailReason: "", - IndexVersion: 0, - IsDeleted: false, - CreateTime: 0, - IndexFileKeys: nil, - IndexSize: 0, - } - - t.Run("add", func(t *testing.T) { - metakv := mocks.NewMetaKv(t) - metakv.EXPECT().Save(mock.Anything, mock.Anything).Return(nil) - catalog := &Catalog{ - MetaKv: metakv, - } - - err := catalog.AlterSegmentIndex(context.Background(), segIdx) - assert.NoError(t, err) - }) -} - func TestCatalog_AlterSegmentIndexes(t *testing.T) { segIdx := &model.SegmentIndex{ SegmentID: 0,