fix: Drop segment meta info with prefix (#29857)

pr: #29856
If segment has more than 128 log fils, drop segment will exceed etcd txn
ops limit, which will failed the drop segment request
This PR drop segment meta info with prefix, to avoid drop segment meta
failed

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2024-01-11 15:02:50 +08:00 committed by GitHub
parent 322e9f39a3
commit 603cd1fb3f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 17 additions and 7 deletions

View File

@ -333,6 +333,7 @@ func TestMeta_Basic(t *testing.T) {
metakv2.EXPECT().MultiRemove(mock.Anything).Return(errors.New("failed")).Maybe()
metakv2.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
metakv2.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, nil).Maybe()
metakv2.EXPECT().MultiSaveAndRemoveWithPrefix(mock.Anything, mock.Anything).Return(errors.New("failed"))
catalog = datacoord.NewCatalog(metakv2, "", "")
meta, err = newMeta(context.TODO(), catalog, nil)
assert.NoError(t, err)

View File

@ -565,6 +565,7 @@ func TestEmbedEtcd(te *testing.T) {
{map[string]string{"y/c": "vvv"}, []string{}, "y", 2, 3},
{map[string]string{"p/a": "vvv"}, []string{"y/a", "y"}, "y", 3, 0},
{map[string]string{}, []string{"p"}, "p", 1, 0},
{nil, []string{"p"}, "p", 0, 0},
}
for _, test := range multiSaveAndRemoveWithPrefixTests {

View File

@ -437,10 +437,12 @@ func (kc *Catalog) SaveDroppedSegmentsInBatch(ctx context.Context, segments []*d
func (kc *Catalog) DropSegment(ctx context.Context, segment *datapb.SegmentInfo) error {
segKey := buildSegmentPath(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID())
keys := []string{segKey}
binlogKeys := buildBinlogKeys(segment)
keys = append(keys, binlogKeys...)
if err := kc.MetaKv.MultiRemove(keys); err != nil {
binlogPreix := fmt.Sprintf("%s/%d/%d/%d", SegmentBinlogPathPrefix, segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID())
deltalogPreix := fmt.Sprintf("%s/%d/%d/%d", SegmentDeltalogPathPrefix, segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID())
statelogPreix := fmt.Sprintf("%s/%d/%d/%d", SegmentStatslogPathPrefix, segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID())
keys := []string{segKey, binlogPreix, deltalogPreix, statelogPreix}
if err := kc.MetaKv.MultiSaveAndRemoveWithPrefix(nil, keys); err != nil {
return err
}

View File

@ -36,6 +36,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/kv/mocks"
"github.com/milvus-io/milvus/internal/kv/predicates"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -429,7 +430,7 @@ func Test_AlterSegments(t *testing.T) {
func Test_DropSegment(t *testing.T) {
t.Run("remove failed", func(t *testing.T) {
metakv := mocks.NewMetaKv(t)
metakv.EXPECT().MultiRemove(mock.Anything).Return(errors.New("error"))
metakv.EXPECT().MultiSaveAndRemoveWithPrefix(mock.Anything, mock.Anything).Return(errors.New("error"))
catalog := NewCatalog(metakv, rootPath, "")
err := catalog.DropSegment(context.TODO(), segment1)
@ -439,7 +440,7 @@ func Test_DropSegment(t *testing.T) {
t.Run("remove successfully", func(t *testing.T) {
removedKvs := make(map[string]struct{}, 0)
metakv := mocks.NewMetaKv(t)
metakv.EXPECT().MultiRemove(mock.Anything).RunAndReturn(func(s []string) error {
metakv.EXPECT().MultiSaveAndRemoveWithPrefix(mock.Anything, mock.Anything).RunAndReturn(func(m map[string]string, s []string, p ...predicates.Predicate) error {
for _, key := range s {
removedKvs[key] = struct{}{}
}
@ -450,8 +451,13 @@ func Test_DropSegment(t *testing.T) {
err := catalog.DropSegment(context.TODO(), segment1)
assert.NoError(t, err)
segKey := buildSegmentPath(segment1.GetCollectionID(), segment1.GetPartitionID(), segment1.GetID())
binlogPreix := fmt.Sprintf("%s/%d/%d/%d", SegmentBinlogPathPrefix, segment1.GetCollectionID(), segment1.GetPartitionID(), segment1.GetID())
deltalogPreix := fmt.Sprintf("%s/%d/%d/%d", SegmentDeltalogPathPrefix, segment1.GetCollectionID(), segment1.GetPartitionID(), segment1.GetID())
statelogPreix := fmt.Sprintf("%s/%d/%d/%d", SegmentStatslogPathPrefix, segment1.GetCollectionID(), segment1.GetPartitionID(), segment1.GetID())
assert.Equal(t, 4, len(removedKvs))
for _, k := range []string{k1, k2, k3, k5} {
for _, k := range []string{segKey, binlogPreix, deltalogPreix, statelogPreix} {
_, ok := removedKvs[k]
assert.True(t, ok)
}