diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index a744416087..7fafa5ebdc 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -333,6 +333,7 @@ func TestMeta_Basic(t *testing.T) { metakv2.EXPECT().MultiRemove(mock.Anything).Return(errors.New("failed")).Maybe() metakv2.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() metakv2.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, nil).Maybe() + metakv2.EXPECT().MultiSaveAndRemoveWithPrefix(mock.Anything, mock.Anything).Return(errors.New("failed")) catalog = datacoord.NewCatalog(metakv2, "", "") meta, err = newMeta(context.TODO(), catalog, nil) assert.NoError(t, err) diff --git a/internal/kv/etcd/embed_etcd_kv_test.go b/internal/kv/etcd/embed_etcd_kv_test.go index d41684ff2d..fa4272d4c9 100644 --- a/internal/kv/etcd/embed_etcd_kv_test.go +++ b/internal/kv/etcd/embed_etcd_kv_test.go @@ -565,6 +565,7 @@ func TestEmbedEtcd(te *testing.T) { {map[string]string{"y/c": "vvv"}, []string{}, "y", 2, 3}, {map[string]string{"p/a": "vvv"}, []string{"y/a", "y"}, "y", 3, 0}, {map[string]string{}, []string{"p"}, "p", 1, 0}, + {nil, []string{"p"}, "p", 0, 0}, } for _, test := range multiSaveAndRemoveWithPrefixTests { diff --git a/internal/metastore/kv/datacoord/kv_catalog.go b/internal/metastore/kv/datacoord/kv_catalog.go index 8a5a7a3ae7..e0f67c783b 100644 --- a/internal/metastore/kv/datacoord/kv_catalog.go +++ b/internal/metastore/kv/datacoord/kv_catalog.go @@ -426,10 +426,12 @@ func (kc *Catalog) SaveDroppedSegmentsInBatch(ctx context.Context, segments []*d func (kc *Catalog) DropSegment(ctx context.Context, segment *datapb.SegmentInfo) error { segKey := buildSegmentPath(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID()) - keys := []string{segKey} - binlogKeys := buildBinlogKeys(segment) - keys = append(keys, binlogKeys...) - if err := kc.MetaKv.MultiRemove(keys); err != nil { + binlogPreix := fmt.Sprintf("%s/%d/%d/%d", SegmentBinlogPathPrefix, segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID()) + deltalogPreix := fmt.Sprintf("%s/%d/%d/%d", SegmentDeltalogPathPrefix, segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID()) + statelogPreix := fmt.Sprintf("%s/%d/%d/%d", SegmentStatslogPathPrefix, segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID()) + + keys := []string{segKey, binlogPreix, deltalogPreix, statelogPreix} + if err := kc.MetaKv.MultiSaveAndRemoveWithPrefix(nil, keys); err != nil { return err } diff --git a/internal/metastore/kv/datacoord/kv_catalog_test.go b/internal/metastore/kv/datacoord/kv_catalog_test.go index a87c2507ee..079c778589 100644 --- a/internal/metastore/kv/datacoord/kv_catalog_test.go +++ b/internal/metastore/kv/datacoord/kv_catalog_test.go @@ -36,6 +36,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/kv/mocks" + "github.com/milvus-io/milvus/internal/kv/predicates" "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -429,7 +430,7 @@ func Test_AlterSegments(t *testing.T) { func Test_DropSegment(t *testing.T) { t.Run("remove failed", func(t *testing.T) { metakv := mocks.NewMetaKv(t) - metakv.EXPECT().MultiRemove(mock.Anything).Return(errors.New("error")) + metakv.EXPECT().MultiSaveAndRemoveWithPrefix(mock.Anything, mock.Anything).Return(errors.New("error")) catalog := NewCatalog(metakv, rootPath, "") err := catalog.DropSegment(context.TODO(), segment1) @@ -439,7 +440,7 @@ func Test_DropSegment(t *testing.T) { t.Run("remove successfully", func(t *testing.T) { removedKvs := make(map[string]struct{}, 0) metakv := mocks.NewMetaKv(t) - metakv.EXPECT().MultiRemove(mock.Anything).RunAndReturn(func(s []string) error { + metakv.EXPECT().MultiSaveAndRemoveWithPrefix(mock.Anything, mock.Anything).RunAndReturn(func(m map[string]string, s []string, p ...predicates.Predicate) error { for _, key := range s { removedKvs[key] = struct{}{} } @@ -450,8 +451,13 @@ func Test_DropSegment(t *testing.T) { err := catalog.DropSegment(context.TODO(), segment1) assert.NoError(t, err) + segKey := buildSegmentPath(segment1.GetCollectionID(), segment1.GetPartitionID(), segment1.GetID()) + binlogPreix := fmt.Sprintf("%s/%d/%d/%d", SegmentBinlogPathPrefix, segment1.GetCollectionID(), segment1.GetPartitionID(), segment1.GetID()) + deltalogPreix := fmt.Sprintf("%s/%d/%d/%d", SegmentDeltalogPathPrefix, segment1.GetCollectionID(), segment1.GetPartitionID(), segment1.GetID()) + statelogPreix := fmt.Sprintf("%s/%d/%d/%d", SegmentStatslogPathPrefix, segment1.GetCollectionID(), segment1.GetPartitionID(), segment1.GetID()) + assert.Equal(t, 4, len(removedKvs)) - for _, k := range []string{k1, k2, k3, k5} { + for _, k := range []string{segKey, binlogPreix, deltalogPreix, statelogPreix} { _, ok := removedKvs[k] assert.True(t, ok) }