diff --git a/internal/metastore/kv/rootcoord/kv_catalog.go b/internal/metastore/kv/rootcoord/kv_catalog.go index fdbd695228..0d20a4bc85 100644 --- a/internal/metastore/kv/rootcoord/kv_catalog.go +++ b/internal/metastore/kv/rootcoord/kv_catalog.go @@ -90,14 +90,14 @@ func batchMultiSaveAndRemoveWithPrefix(snapshot kv.SnapShotKV, maxTxnNum int, sa saveFn := func(partialKvs map[string]string) error { return snapshot.MultiSave(partialKvs, ts) } - if err := etcd.SaveByBatchWithLimit(saves, maxTxnNum/2, saveFn); err != nil { + if err := etcd.SaveByBatchWithLimit(saves, maxTxnNum, saveFn); err != nil { return err } removeFn := func(partialKeys []string) error { return snapshot.MultiSaveAndRemoveWithPrefix(nil, partialKeys, ts) } - return etcd.RemoveByBatch(removals, removeFn) + return etcd.RemoveByBatchWithLimit(removals, maxTxnNum, removeFn) } func (kc *Catalog) CreateDatabase(ctx context.Context, db *model.Database, ts typeutil.Timestamp) error { @@ -431,6 +431,8 @@ func (kc *Catalog) DropCollection(ctx context.Context, collectionInfo *model.Col // Though batchMultiSaveAndRemoveWithPrefix is not atomic enough, we can promise atomicity outside. // If we found collection under dropping state, we'll know that gc is not completely on this collection. // However, if we remove collection first, we cannot remove other metas. + // We set maxTxnNum to 64, since SnapshotKV may save both snapshot key and the original key if the original key is + // newest. if err := batchMultiSaveAndRemoveWithPrefix(kc.Snapshot, maxTxnNum, nil, delMetakeysSnap, ts); err != nil { return err } diff --git a/internal/metastore/kv/rootcoord/kv_catalog_test.go b/internal/metastore/kv/rootcoord/kv_catalog_test.go index 54c22f9c08..c6e4da3b84 100644 --- a/internal/metastore/kv/rootcoord/kv_catalog_test.go +++ b/internal/metastore/kv/rootcoord/kv_catalog_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/atomic" + "go.uber.org/zap" "golang.org/x/exp/maps" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -26,6 +27,7 @@ import ( pb "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util" "github.com/milvus-io/milvus/pkg/util/crypto" "github.com/milvus-io/milvus/pkg/util/funcutil" @@ -962,14 +964,22 @@ func Test_batchMultiSaveAndRemoveWithPrefix(t *testing.T) { t.Run("normal case", func(t *testing.T) { snapshot := kv.NewMockSnapshotKV() snapshot.MultiSaveFunc = func(kvs map[string]string, ts typeutil.Timestamp) error { + log.Info("multi save", zap.Any("len", len(kvs)), zap.Any("saves", kvs)) return nil } snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { + log.Info("multi save and remove with prefix", zap.Any("len of saves", len(saves)), zap.Any("len of removals", len(removals)), + zap.Any("saves", saves), zap.Any("removals", removals)) return nil } - saves := map[string]string{"k": "v"} - removals := []string{"prefix1", "prefix2"} - err := batchMultiSaveAndRemoveWithPrefix(snapshot, maxTxnNum, saves, removals, 0) + n := 400 + saves := map[string]string{} + removals := make([]string, 0, n) + for i := 0; i < n; i++ { + saves[fmt.Sprintf("k%d", i)] = fmt.Sprintf("v%d", i) + removals = append(removals, fmt.Sprintf("k%d", i)) + } + err := batchMultiSaveAndRemoveWithPrefix(snapshot, 64, saves, removals, 0) assert.NoError(t, err) }) } diff --git a/pkg/util/etcd/etcd_util.go b/pkg/util/etcd/etcd_util.go index 77dc0ce040..93a5114881 100644 --- a/pkg/util/etcd/etcd_util.go +++ b/pkg/util/etcd/etcd_util.go @@ -148,13 +148,13 @@ func SaveByBatch(kvs map[string]string, op func(partialKvs map[string]string) er return SaveByBatchWithLimit(kvs, maxTxnNum, op) } -func RemoveByBatch(removals []string, op func(partialKeys []string) error) error { +func RemoveByBatchWithLimit(removals []string, limit int, op func(partialKeys []string) error) error { if len(removals) == 0 { return nil } - for i := 0; i < len(removals); i = i + maxTxnNum { - end := min(i+maxTxnNum, len(removals)) + for i := 0; i < len(removals); i = i + limit { + end := min(i+limit, len(removals)) batch := removals[i:end] if err := op(batch); err != nil { return err @@ -163,6 +163,10 @@ func RemoveByBatch(removals []string, op func(partialKeys []string) error) error return nil } +func RemoveByBatch(removals []string, op func(partialKeys []string) error) error { + return RemoveByBatchWithLimit(removals, maxTxnNum, op) +} + func buildKvGroup(keys, values []string) (map[string]string, error) { if len(keys) != len(values) { return nil, fmt.Errorf("length of keys (%d) and values (%d) are not equal", len(keys), len(values))