From 603cd1fb3fc8fa2760b949d4d88b786bac2fedab Mon Sep 17 00:00:00 2001 From: wei liu Date: Thu, 11 Jan 2024 15:02:50 +0800 Subject: [PATCH] fix: Drop segment meta info with prefix (#29857) pr: #29856 If segment has more than 128 log fils, drop segment will exceed etcd txn ops limit, which will failed the drop segment request This PR drop segment meta info with prefix, to avoid drop segment meta failed --------- Signed-off-by: Wei Liu --- internal/datacoord/meta_test.go | 1 + internal/kv/etcd/embed_etcd_kv_test.go | 1 + internal/metastore/kv/datacoord/kv_catalog.go | 10 ++++++---- internal/metastore/kv/datacoord/kv_catalog_test.go | 12 +++++++++--- 4 files changed, 17 insertions(+), 7 deletions(-) diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 1c4d9ad7fe..28d3fcf3c9 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -333,6 +333,7 @@ func TestMeta_Basic(t *testing.T) { metakv2.EXPECT().MultiRemove(mock.Anything).Return(errors.New("failed")).Maybe() metakv2.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() metakv2.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, nil).Maybe() + metakv2.EXPECT().MultiSaveAndRemoveWithPrefix(mock.Anything, mock.Anything).Return(errors.New("failed")) catalog = datacoord.NewCatalog(metakv2, "", "") meta, err = newMeta(context.TODO(), catalog, nil) assert.NoError(t, err) diff --git a/internal/kv/etcd/embed_etcd_kv_test.go b/internal/kv/etcd/embed_etcd_kv_test.go index d41684ff2d..fa4272d4c9 100644 --- a/internal/kv/etcd/embed_etcd_kv_test.go +++ b/internal/kv/etcd/embed_etcd_kv_test.go @@ -565,6 +565,7 @@ func TestEmbedEtcd(te *testing.T) { {map[string]string{"y/c": "vvv"}, []string{}, "y", 2, 3}, {map[string]string{"p/a": "vvv"}, []string{"y/a", "y"}, "y", 3, 0}, {map[string]string{}, []string{"p"}, "p", 1, 0}, + {nil, []string{"p"}, "p", 0, 0}, } for _, test := range multiSaveAndRemoveWithPrefixTests { diff --git a/internal/metastore/kv/datacoord/kv_catalog.go b/internal/metastore/kv/datacoord/kv_catalog.go index b1864d43dc..5a8f22692b 100644 --- a/internal/metastore/kv/datacoord/kv_catalog.go +++ b/internal/metastore/kv/datacoord/kv_catalog.go @@ -437,10 +437,12 @@ func (kc *Catalog) SaveDroppedSegmentsInBatch(ctx context.Context, segments []*d func (kc *Catalog) DropSegment(ctx context.Context, segment *datapb.SegmentInfo) error { segKey := buildSegmentPath(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID()) - keys := []string{segKey} - binlogKeys := buildBinlogKeys(segment) - keys = append(keys, binlogKeys...) - if err := kc.MetaKv.MultiRemove(keys); err != nil { + binlogPreix := fmt.Sprintf("%s/%d/%d/%d", SegmentBinlogPathPrefix, segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID()) + deltalogPreix := fmt.Sprintf("%s/%d/%d/%d", SegmentDeltalogPathPrefix, segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID()) + statelogPreix := fmt.Sprintf("%s/%d/%d/%d", SegmentStatslogPathPrefix, segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID()) + + keys := []string{segKey, binlogPreix, deltalogPreix, statelogPreix} + if err := kc.MetaKv.MultiSaveAndRemoveWithPrefix(nil, keys); err != nil { return err } diff --git a/internal/metastore/kv/datacoord/kv_catalog_test.go b/internal/metastore/kv/datacoord/kv_catalog_test.go index a87c2507ee..079c778589 100644 --- a/internal/metastore/kv/datacoord/kv_catalog_test.go +++ b/internal/metastore/kv/datacoord/kv_catalog_test.go @@ -36,6 +36,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/kv/mocks" + "github.com/milvus-io/milvus/internal/kv/predicates" "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -429,7 +430,7 @@ func Test_AlterSegments(t *testing.T) { func Test_DropSegment(t *testing.T) { t.Run("remove failed", func(t *testing.T) { metakv := mocks.NewMetaKv(t) - metakv.EXPECT().MultiRemove(mock.Anything).Return(errors.New("error")) + metakv.EXPECT().MultiSaveAndRemoveWithPrefix(mock.Anything, mock.Anything).Return(errors.New("error")) catalog := NewCatalog(metakv, rootPath, "") err := catalog.DropSegment(context.TODO(), segment1) @@ -439,7 +440,7 @@ func Test_DropSegment(t *testing.T) { t.Run("remove successfully", func(t *testing.T) { removedKvs := make(map[string]struct{}, 0) metakv := mocks.NewMetaKv(t) - metakv.EXPECT().MultiRemove(mock.Anything).RunAndReturn(func(s []string) error { + metakv.EXPECT().MultiSaveAndRemoveWithPrefix(mock.Anything, mock.Anything).RunAndReturn(func(m map[string]string, s []string, p ...predicates.Predicate) error { for _, key := range s { removedKvs[key] = struct{}{} } @@ -450,8 +451,13 @@ func Test_DropSegment(t *testing.T) { err := catalog.DropSegment(context.TODO(), segment1) assert.NoError(t, err) + segKey := buildSegmentPath(segment1.GetCollectionID(), segment1.GetPartitionID(), segment1.GetID()) + binlogPreix := fmt.Sprintf("%s/%d/%d/%d", SegmentBinlogPathPrefix, segment1.GetCollectionID(), segment1.GetPartitionID(), segment1.GetID()) + deltalogPreix := fmt.Sprintf("%s/%d/%d/%d", SegmentDeltalogPathPrefix, segment1.GetCollectionID(), segment1.GetPartitionID(), segment1.GetID()) + statelogPreix := fmt.Sprintf("%s/%d/%d/%d", SegmentStatslogPathPrefix, segment1.GetCollectionID(), segment1.GetPartitionID(), segment1.GetID()) + assert.Equal(t, 4, len(removedKvs)) - for _, k := range []string{k1, k2, k3, k5} { + for _, k := range []string{segKey, binlogPreix, deltalogPreix, statelogPreix} { _, ok := removedKvs[k] assert.True(t, ok) }