From 9fc5f1176c3e076c57dfc5387d94288335ae4ae5 Mon Sep 17 00:00:00 2001 From: wei liu Date: Thu, 11 Jan 2024 12:22:51 +0800 Subject: [PATCH] fix: Drop segment meta info with prefix (#29856) If segment has more than 128 log fils, drop segment will exceed etcd txn ops limit, which will failed the drop segment request This PR drop segment meta info with prefix, to avoid drop segment meta failed --------- Signed-off-by: Wei Liu --- internal/datacoord/meta_test.go | 1 + internal/kv/etcd/embed_etcd_kv_test.go | 1 + internal/metastore/kv/datacoord/kv_catalog.go | 10 ++++++---- internal/metastore/kv/datacoord/kv_catalog_test.go | 12 +++++++++--- 4 files changed, 17 insertions(+), 7 deletions(-) diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index a744416087..7fafa5ebdc 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -333,6 +333,7 @@ func TestMeta_Basic(t *testing.T) { metakv2.EXPECT().MultiRemove(mock.Anything).Return(errors.New("failed")).Maybe() metakv2.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() metakv2.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, nil).Maybe() + metakv2.EXPECT().MultiSaveAndRemoveWithPrefix(mock.Anything, mock.Anything).Return(errors.New("failed")) catalog = datacoord.NewCatalog(metakv2, "", "") meta, err = newMeta(context.TODO(), catalog, nil) assert.NoError(t, err) diff --git a/internal/kv/etcd/embed_etcd_kv_test.go b/internal/kv/etcd/embed_etcd_kv_test.go index d41684ff2d..fa4272d4c9 100644 --- a/internal/kv/etcd/embed_etcd_kv_test.go +++ b/internal/kv/etcd/embed_etcd_kv_test.go @@ -565,6 +565,7 @@ func TestEmbedEtcd(te *testing.T) { {map[string]string{"y/c": "vvv"}, []string{}, "y", 2, 3}, {map[string]string{"p/a": "vvv"}, []string{"y/a", "y"}, "y", 3, 0}, {map[string]string{}, []string{"p"}, "p", 1, 0}, + {nil, []string{"p"}, "p", 0, 0}, } for _, test := range multiSaveAndRemoveWithPrefixTests { diff --git a/internal/metastore/kv/datacoord/kv_catalog.go b/internal/metastore/kv/datacoord/kv_catalog.go index 8a5a7a3ae7..e0f67c783b 100644 --- a/internal/metastore/kv/datacoord/kv_catalog.go +++ b/internal/metastore/kv/datacoord/kv_catalog.go @@ -426,10 +426,12 @@ func (kc *Catalog) SaveDroppedSegmentsInBatch(ctx context.Context, segments []*d func (kc *Catalog) DropSegment(ctx context.Context, segment *datapb.SegmentInfo) error { segKey := buildSegmentPath(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID()) - keys := []string{segKey} - binlogKeys := buildBinlogKeys(segment) - keys = append(keys, binlogKeys...) - if err := kc.MetaKv.MultiRemove(keys); err != nil { + binlogPreix := fmt.Sprintf("%s/%d/%d/%d", SegmentBinlogPathPrefix, segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID()) + deltalogPreix := fmt.Sprintf("%s/%d/%d/%d", SegmentDeltalogPathPrefix, segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID()) + statelogPreix := fmt.Sprintf("%s/%d/%d/%d", SegmentStatslogPathPrefix, segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID()) + + keys := []string{segKey, binlogPreix, deltalogPreix, statelogPreix} + if err := kc.MetaKv.MultiSaveAndRemoveWithPrefix(nil, keys); err != nil { return err } diff --git a/internal/metastore/kv/datacoord/kv_catalog_test.go b/internal/metastore/kv/datacoord/kv_catalog_test.go index a87c2507ee..079c778589 100644 --- a/internal/metastore/kv/datacoord/kv_catalog_test.go +++ b/internal/metastore/kv/datacoord/kv_catalog_test.go @@ -36,6 +36,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/kv/mocks" + "github.com/milvus-io/milvus/internal/kv/predicates" "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -429,7 +430,7 @@ func Test_AlterSegments(t *testing.T) { func Test_DropSegment(t *testing.T) { t.Run("remove failed", func(t *testing.T) { metakv := mocks.NewMetaKv(t) - metakv.EXPECT().MultiRemove(mock.Anything).Return(errors.New("error")) + metakv.EXPECT().MultiSaveAndRemoveWithPrefix(mock.Anything, mock.Anything).Return(errors.New("error")) catalog := NewCatalog(metakv, rootPath, "") err := catalog.DropSegment(context.TODO(), segment1) @@ -439,7 +440,7 @@ func Test_DropSegment(t *testing.T) { t.Run("remove successfully", func(t *testing.T) { removedKvs := make(map[string]struct{}, 0) metakv := mocks.NewMetaKv(t) - metakv.EXPECT().MultiRemove(mock.Anything).RunAndReturn(func(s []string) error { + metakv.EXPECT().MultiSaveAndRemoveWithPrefix(mock.Anything, mock.Anything).RunAndReturn(func(m map[string]string, s []string, p ...predicates.Predicate) error { for _, key := range s { removedKvs[key] = struct{}{} } @@ -450,8 +451,13 @@ func Test_DropSegment(t *testing.T) { err := catalog.DropSegment(context.TODO(), segment1) assert.NoError(t, err) + segKey := buildSegmentPath(segment1.GetCollectionID(), segment1.GetPartitionID(), segment1.GetID()) + binlogPreix := fmt.Sprintf("%s/%d/%d/%d", SegmentBinlogPathPrefix, segment1.GetCollectionID(), segment1.GetPartitionID(), segment1.GetID()) + deltalogPreix := fmt.Sprintf("%s/%d/%d/%d", SegmentDeltalogPathPrefix, segment1.GetCollectionID(), segment1.GetPartitionID(), segment1.GetID()) + statelogPreix := fmt.Sprintf("%s/%d/%d/%d", SegmentStatslogPathPrefix, segment1.GetCollectionID(), segment1.GetPartitionID(), segment1.GetID()) + assert.Equal(t, 4, len(removedKvs)) - for _, k := range []string{k1, k2, k3, k5} { + for _, k := range []string{segKey, binlogPreix, deltalogPreix, statelogPreix} { _, ok := removedKvs[k] assert.True(t, ok) }