diff --git a/internal/kv/etcd/embed_etcd_kv.go b/internal/kv/etcd/embed_etcd_kv.go index 7d3f332145..ae2f63d1a9 100644 --- a/internal/kv/etcd/embed_etcd_kv.go +++ b/internal/kv/etcd/embed_etcd_kv.go @@ -30,6 +30,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/internal/kv" + kvi "github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/log" ) @@ -549,7 +550,7 @@ func (kv *EmbedEtcdKV) CompareValueAndSwap(key, value, target string, opts ...cl return err } if !resp.Succeeded { - return fmt.Errorf("function CompareValueAndSwap error for compare is false for key: %s", key) + return kvi.NewCompareFailedError(fmt.Errorf("function CompareValueAndSwap error for compare is false for key: %s", key)) } return nil @@ -570,7 +571,7 @@ func (kv *EmbedEtcdKV) CompareValueAndSwapBytes(key string, value, target []byte return err } if !resp.Succeeded { - return fmt.Errorf("function CompareValueAndSwapBytes error for compare is false for key: %s", key) + return kvi.NewCompareFailedError(fmt.Errorf("function CompareValueAndSwapBytes error for compare is false for key: %s", key)) } return nil @@ -591,7 +592,7 @@ func (kv *EmbedEtcdKV) CompareVersionAndSwap(key string, version int64, target s return err } if !resp.Succeeded { - return fmt.Errorf("function CompareAndSwap error for compare is false for key: %s", key) + return kvi.NewCompareFailedError(fmt.Errorf("function CompareAndSwap error for compare is false for key: %s", key)) } return nil @@ -612,7 +613,7 @@ func (kv *EmbedEtcdKV) CompareVersionAndSwapBytes(key string, version int64, tar return err } if !resp.Succeeded { - return fmt.Errorf("function CompareVersionAndSwapBytes error for compare is false for key: %s", key) + return kvi.NewCompareFailedError(fmt.Errorf("function CompareVersionAndSwapBytes error for compare is false for key: %s", key)) } return nil diff --git a/internal/kv/etcd/embed_etcd_kv_test.go b/internal/kv/etcd/embed_etcd_kv_test.go index 75e05ddf10..84ec9b5537 100644 --- a/internal/kv/etcd/embed_etcd_kv_test.go +++ b/internal/kv/etcd/embed_etcd_kv_test.go @@ -17,9 +17,11 @@ package etcdkv_test import ( + "errors" "os" "testing" + "github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/util/metricsinfo" embed_etcd_kv "github.com/milvus-io/milvus/internal/kv/etcd" @@ -800,6 +802,7 @@ func TestEmbedEtcd(te *testing.T) { assert.Equal(t, revision+1, resp.Header.Revision) } + var compareErr *kv.CompareFailedError err = metaKv.CompareVersionAndSwap("a/b/c", 0, "1") assert.NoError(t, err) @@ -809,12 +812,14 @@ func TestEmbedEtcd(te *testing.T) { err = metaKv.CompareVersionAndSwap("a/b/c", 0, "1") assert.Error(t, err) + assert.True(t, errors.As(err, &compareErr)) err = metaKv.CompareValueAndSwap("a/b/c", "1", "2") assert.NoError(t, err) err = metaKv.CompareValueAndSwap("a/b/c", "1", "2") assert.Error(t, err) + assert.True(t, errors.As(err, &compareErr)) }) te.Run("Etcd Revision Bytes", func(t *testing.T) { @@ -852,6 +857,7 @@ func TestEmbedEtcd(te *testing.T) { assert.Equal(t, revision+1, resp.Header.Revision) } + var compareErr *kv.CompareFailedError err = metaKv.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1")) assert.NoError(t, err) @@ -861,12 +867,15 @@ func TestEmbedEtcd(te *testing.T) { err = metaKv.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1")) assert.Error(t, err) + assert.True(t, errors.As(err, &compareErr)) err = metaKv.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2")) assert.NoError(t, err) err = metaKv.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2")) assert.Error(t, err) + assert.True(t, errors.As(err, &compareErr)) + }) te.Run("Etcd Lease", func(t *testing.T) { diff --git a/internal/kv/etcd/etcd_kv.go b/internal/kv/etcd/etcd_kv.go index f7583fc168..e43d1a118b 100644 --- a/internal/kv/etcd/etcd_kv.go +++ b/internal/kv/etcd/etcd_kv.go @@ -22,6 +22,7 @@ import ( "path" "time" + kvi "github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/log" clientv3 "go.etcd.io/etcd/client/v3" @@ -567,7 +568,7 @@ func (kv *EtcdKV) CompareValueAndSwap(key, value, target string, opts ...clientv return err } if !resp.Succeeded { - return fmt.Errorf("function CompareAndSwap error for compare is false for key: %s", key) + return kvi.NewCompareFailedError(fmt.Errorf("function CompareAndSwap error for compare is false for key: %s", key)) } CheckElapseAndWarn(start, "Slow etcd operation compare value and swap") return nil @@ -589,7 +590,7 @@ func (kv *EtcdKV) CompareValueAndSwapBytes(key string, value, target []byte, opt return err } if !resp.Succeeded { - return fmt.Errorf("function CompareAndSwap error for compare is false for key: %s", key) + return kvi.NewCompareFailedError(fmt.Errorf("function CompareAndSwap error for compare is false for key: %s", key)) } CheckElapseAndWarn(start, "Slow etcd operation compare value and swap") return nil @@ -611,8 +612,8 @@ func (kv *EtcdKV) CompareVersionAndSwap(key string, source int64, target string, return err } if !resp.Succeeded { - return fmt.Errorf("function CompareAndSwap error for compare is false for key: %s,"+ - " source version: %d, target version: %s", key, source, target) + return kvi.NewCompareFailedError(fmt.Errorf("function CompareAndSwap error for compare is false for key: %s,"+ + " source version: %d, target version: %s", key, source, target)) } CheckElapseAndWarn(start, "Slow etcd operation compare version and swap") return nil @@ -634,8 +635,8 @@ func (kv *EtcdKV) CompareVersionAndSwapBytes(key string, source int64, target [] return err } if !resp.Succeeded { - return fmt.Errorf("function CompareAndSwap error for compare is false for key: %s,"+ - " source version: %d, target version: %s", key, source, target) + return kvi.NewCompareFailedError(fmt.Errorf("function CompareAndSwap error for compare is false for key: %s,"+ + " source version: %d, target version: %s", key, source, target)) } CheckElapseAndWarn(start, "Slow etcd operation compare version and swap") return nil diff --git a/internal/kv/etcd/etcd_kv_test.go b/internal/kv/etcd/etcd_kv_test.go index 7db3a6d78e..6be21fe72b 100644 --- a/internal/kv/etcd/etcd_kv_test.go +++ b/internal/kv/etcd/etcd_kv_test.go @@ -17,10 +17,12 @@ package etcdkv_test import ( + "errors" "os" "testing" "time" + "github.com/milvus-io/milvus/internal/kv" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/util/etcd" "github.com/milvus-io/milvus/internal/util/paramtable" @@ -723,6 +725,7 @@ func TestEtcdKV_Load(te *testing.T) { assert.Equal(t, revision+1, resp.Header.Revision) } + var compareErr *kv.CompareFailedError err = etcdKV.CompareVersionAndSwap("a/b/c", 0, "1") assert.NoError(t, err) @@ -732,12 +735,14 @@ func TestEtcdKV_Load(te *testing.T) { err = etcdKV.CompareVersionAndSwap("a/b/c", 0, "1") assert.Error(t, err) + assert.True(t, errors.As(err, &compareErr)) err = etcdKV.CompareValueAndSwap("a/b/c", "1", "2") assert.NoError(t, err) err = etcdKV.CompareValueAndSwap("a/b/c", "1", "2") assert.Error(t, err) + assert.True(t, errors.As(err, &compareErr)) }) te.Run("Etcd Revision Bytes", func(t *testing.T) { @@ -772,6 +777,7 @@ func TestEtcdKV_Load(te *testing.T) { assert.Equal(t, revision+1, resp.Header.Revision) } + var compareErr *kv.CompareFailedError err = etcdKV.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1")) assert.NoError(t, err) @@ -781,12 +787,14 @@ func TestEtcdKV_Load(te *testing.T) { err = etcdKV.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1")) assert.Error(t, err) + assert.True(t, errors.As(err, &compareErr)) err = etcdKV.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2")) assert.NoError(t, err) err = etcdKV.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2")) assert.Error(t, err) + assert.True(t, errors.As(err, &compareErr)) }) te.Run("Etcd Lease", func(t *testing.T) { diff --git a/internal/kv/kv.go b/internal/kv/kv.go index 882824d6b0..c70e2888cf 100644 --- a/internal/kv/kv.go +++ b/internal/kv/kv.go @@ -21,6 +21,21 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" ) +// CompareFailedError is a helper type for checking MetaKv CompareAndSwap series func error type +type CompareFailedError struct { + internalError error +} + +// Error implements error interface +func (e *CompareFailedError) Error() string { + return e.internalError.Error() +} + +// NewCompareFailedError wraps error into NewCompareFailedError +func NewCompareFailedError(err error) error { + return &CompareFailedError{internalError: err} +} + // BaseKV contains base operations of kv. Include save, load and remove. type BaseKV interface { Load(key string) (string, error)