diff --git a/docs/developer_guides/appendix_a_basic_components.md b/docs/developer_guides/appendix_a_basic_components.md index 34572cefa2..b82ff5093d 100644 --- a/docs/developer_guides/appendix_a_basic_components.md +++ b/docs/developer_guides/appendix_a_basic_components.md @@ -426,15 +426,9 @@ type MetaKv interface { TxnKV GetPath(key string) string LoadWithPrefix(key string) ([]string, []string, error) - LoadWithPrefix2(key string) ([]string, []string, []int64, error) - LoadWithRevision(key string) ([]string, []string, int64, error) Watch(key string) clientv3.WatchChan WatchWithPrefix(key string) clientv3.WatchChan WatchWithRevision(key string, revision int64) clientv3.WatchChan - SaveWithLease(key, value string, id clientv3.LeaseID) error - Grant(ttl int64) (id clientv3.LeaseID, err error) - KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliveResponse, error) - CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) error CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) error } diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index d2f1fb9581..5e52797f95 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -67,18 +67,6 @@ func (mm *metaMemoryKV) GetPath(key string) string { panic("implement me") } -func (mm *metaMemoryKV) LoadWithPrefix2(key string) ([]string, []string, []int64, error) { - panic("implement me") -} - -func (mm *metaMemoryKV) LoadWithRevisionAndVersions(key string) ([]string, []string, []int64, int64, error) { - panic("implement me") -} - -func (mm *metaMemoryKV) LoadWithRevision(key string) ([]string, []string, int64, error) { - panic("implement me") -} - func (mm *metaMemoryKV) Watch(key string) clientv3.WatchChan { panic("implement me") } @@ -91,26 +79,6 @@ func (mm *metaMemoryKV) WatchWithRevision(key string, revision int64) clientv3.W panic("implement me") } -func (mm *metaMemoryKV) SaveWithLease(key, value string, id clientv3.LeaseID) error { - panic("implement me") -} - -func (mm *metaMemoryKV) SaveWithIgnoreLease(key, value string) error { - panic("implement me") -} - -func (mm *metaMemoryKV) Grant(ttl int64) (id clientv3.LeaseID, err error) { - panic("implement me") -} - -func (mm *metaMemoryKV) KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliveResponse, error) { - panic("implement me") -} - -func (mm *metaMemoryKV) CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) (bool, error) { - panic("implement me") -} - func (mm *metaMemoryKV) CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) (bool, error) { panic("implement me") } diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index e67261d124..82897862c8 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -38,6 +38,7 @@ import ( datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client" indexnodeclient "github.com/milvus-io/milvus/internal/distributed/indexnode/client" rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client" + "github.com/milvus-io/milvus/internal/kv" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/metastore/kv/datacoord" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -105,7 +106,7 @@ type Server struct { etcdCli *clientv3.Client address string - kvClient *etcdkv.EtcdKV + kvClient kv.MetaKv meta *meta segmentManager Manager allocator allocator diff --git a/internal/indexnode/task_test.go b/internal/indexnode/task_test.go index 9aa24bc67d..06f0c2239a 100644 --- a/internal/indexnode/task_test.go +++ b/internal/indexnode/task_test.go @@ -134,10 +134,6 @@ package indexnode // loadWithPrefix2 func(key string) ([]string, []string, []int64, error) // } -// func (mk *mockETCDKV) LoadWithPrefix2(key string) ([]string, []string, []int64, error) { -// return mk.loadWithPrefix2(key) -// } - // func TestIndexBuildTask_loadIndexMeta(t *testing.T) { // t.Run("load empty meta", func(t *testing.T) { // indexTask := &IndexBuildTask{ diff --git a/internal/kv/etcd/embed_etcd_kv.go b/internal/kv/etcd/embed_etcd_kv.go index 1090fef55f..949066256f 100644 --- a/internal/kv/etcd/embed_etcd_kv.go +++ b/internal/kv/etcd/embed_etcd_kv.go @@ -159,49 +159,6 @@ func (kv *EmbedEtcdKV) LoadBytesWithPrefix(key string) ([]string, [][]byte, erro return keys, values, nil } -// LoadWithPrefix2 returns all the keys and values with versions by the given key prefix -func (kv *EmbedEtcdKV) LoadWithPrefix2(key string) ([]string, []string, []int64, error) { - key = path.Join(kv.rootPath, key) - log.Debug("LoadWithPrefix ", zap.String("prefix", key)) - ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) - defer cancel() - resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(), - clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) - if err != nil { - return nil, nil, nil, err - } - keys := make([]string, 0, resp.Count) - values := make([]string, 0, resp.Count) - versions := make([]int64, 0, resp.Count) - for _, kv := range resp.Kvs { - keys = append(keys, string(kv.Key)) - values = append(values, string(kv.Value)) - versions = append(versions, kv.Version) - } - return keys, values, versions, nil -} - -func (kv *EmbedEtcdKV) LoadWithRevisionAndVersions(key string) ([]string, []string, []int64, int64, error) { - key = path.Join(kv.rootPath, key) - log.Debug("LoadWithPrefix ", zap.String("prefix", key)) - ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) - defer cancel() - resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(), - clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) - if err != nil { - return nil, nil, nil, 0, err - } - keys := make([]string, 0, resp.Count) - values := make([]string, 0, resp.Count) - versions := make([]int64, 0, resp.Count) - for _, kv := range resp.Kvs { - keys = append(keys, string(kv.Key)) - values = append(values, string(kv.Value)) - versions = append(versions, kv.Version) - } - return keys, values, versions, resp.Header.Revision, nil -} - // LoadBytesWithPrefix2 returns all the keys and values with versions by the given key prefix func (kv *EmbedEtcdKV) LoadBytesWithPrefix2(key string) ([]string, [][]byte, []int64, error) { key = path.Join(kv.rootPath, key) @@ -328,26 +285,6 @@ func (kv *EmbedEtcdKV) MultiLoadBytes(keys []string) ([][]byte, error) { return result, nil } -// LoadWithRevision returns keys, values and revision with given key prefix. -func (kv *EmbedEtcdKV) LoadWithRevision(key string) ([]string, []string, int64, error) { - key = path.Join(kv.rootPath, key) - log.Debug("LoadWithPrefix ", zap.String("prefix", key)) - ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) - defer cancel() - resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(), - clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) - if err != nil { - return nil, nil, 0, err - } - keys := make([]string, 0, resp.Count) - values := make([]string, 0, resp.Count) - for _, kv := range resp.Kvs { - keys = append(keys, string(kv.Key)) - values = append(values, string(kv.Value)) - } - return keys, values, resp.Header.Revision, nil -} - // LoadBytesWithRevision returns keys, values and revision with given key prefix. func (kv *EmbedEtcdKV) LoadBytesWithRevision(key string) ([]string, [][]byte, int64, error) { key = path.Join(kv.rootPath, key) @@ -386,26 +323,6 @@ func (kv *EmbedEtcdKV) SaveBytes(key string, value []byte) error { return err } -// SaveWithLease is a function to put value in etcd with etcd lease options. -func (kv *EmbedEtcdKV) SaveWithLease(key, value string, id clientv3.LeaseID) error { - log.Debug("Embedded Etcd saving with lease", zap.String("etcd_key", key)) - key = path.Join(kv.rootPath, key) - ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) - defer cancel() - _, err := kv.client.Put(ctx, key, value, clientv3.WithLease(id)) - return err -} - -// SaveWithIgnoreLease updates the key without changing its current lease. -func (kv *EmbedEtcdKV) SaveWithIgnoreLease(key, value string) error { - log.Debug("Embedded Etcd saving with ignore lease", zap.String("etcd_key", key)) - key = path.Join(kv.rootPath, key) - ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) - defer cancel() - _, err := kv.client.Put(ctx, key, value, clientv3.WithIgnoreLease()) - return err -} - // SaveBytesWithLease is a function to put value in etcd with etcd lease options. func (kv *EmbedEtcdKV) SaveBytesWithLease(key string, value []byte, id clientv3.LeaseID) error { key = path.Join(kv.rootPath, key) @@ -580,56 +497,6 @@ func (kv *EmbedEtcdKV) MultiSaveBytesAndRemoveWithPrefix(saves map[string][]byte return err } -// Grant creates a new lease implemented in etcd grant interface. -func (kv *EmbedEtcdKV) Grant(ttl int64) (id clientv3.LeaseID, err error) { - resp, err := kv.client.Grant(context.Background(), ttl) - return resp.ID, err -} - -// KeepAlive keeps the lease alive forever with leaseID. -// Implemented in etcd interface. -func (kv *EmbedEtcdKV) KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliveResponse, error) { - ch, err := kv.client.KeepAlive(context.Background(), id) - if err != nil { - return nil, err - } - return ch, nil -} - -// CompareValueAndSwap compares the existing value with compare, and if they are -// equal, the target is stored in etcd. -func (kv *EmbedEtcdKV) CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) (bool, error) { - ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) - defer cancel() - resp, err := kv.client.Txn(ctx).If( - clientv3.Compare( - clientv3.Value(path.Join(kv.rootPath, key)), - "=", - value)). - Then(clientv3.OpPut(path.Join(kv.rootPath, key), target, opts...)).Commit() - if err != nil { - return false, err - } - return resp.Succeeded, nil -} - -// CompareValueAndSwapBytes compares the existing value with compare, and if they are -// equal, the target is stored in etcd. -func (kv *EmbedEtcdKV) CompareValueAndSwapBytes(key string, value, target []byte, opts ...clientv3.OpOption) (bool, error) { - ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) - defer cancel() - resp, err := kv.client.Txn(ctx).If( - clientv3.Compare( - clientv3.Value(path.Join(kv.rootPath, key)), - "=", - string(value))). - Then(clientv3.OpPut(path.Join(kv.rootPath, key), string(target), opts...)).Commit() - if err != nil { - return false, err - } - return resp.Succeeded, nil -} - // CompareVersionAndSwap compares the existing key-value's version with version, and if // they are equal, the target is stored in etcd. func (kv *EmbedEtcdKV) CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) (bool, error) { diff --git a/internal/kv/etcd/embed_etcd_kv_test.go b/internal/kv/etcd/embed_etcd_kv_test.go index b868eebe02..06d0c5782f 100644 --- a/internal/kv/etcd/embed_etcd_kv_test.go +++ b/internal/kv/etcd/embed_etcd_kv_test.go @@ -20,12 +20,10 @@ import ( "fmt" "sort" "testing" - "time" "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - clientv3 "go.etcd.io/etcd/client/v3" "golang.org/x/exp/maps" embed_etcd_kv "github.com/milvus-io/milvus/internal/kv/etcd" @@ -122,19 +120,6 @@ func TestEmbedEtcd(te *testing.T) { assert.ElementsMatch(t, test.expectedKeys, actualKeys) assert.ElementsMatch(t, test.expectedValues, actualValues) assert.Equal(t, test.expectedError, err) - - actualKeys, actualValues, versions, err := metaKv.LoadWithPrefix2(test.prefix) - assert.ElementsMatch(t, test.expectedKeys, actualKeys) - assert.ElementsMatch(t, test.expectedValues, actualValues) - assert.NotZero(t, versions) - assert.Equal(t, test.expectedError, err) - - actualKeys, actualValues, versions, revision, err := metaKv.LoadWithRevisionAndVersions(test.prefix) - assert.ElementsMatch(t, test.expectedKeys, actualKeys) - assert.ElementsMatch(t, test.expectedValues, actualValues) - assert.NotZero(t, versions) - assert.NotZero(t, revision) - assert.Equal(t, test.expectedError, err) } removeTests := []struct { @@ -272,51 +257,6 @@ func TestEmbedEtcd(te *testing.T) { } }) - te.Run("EtcdKV LoadWithRevision", func(t *testing.T) { - rootPath := "/etcd/test/root/LoadWithRevision" - metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, ¶m.EtcdCfg) - assert.Nil(t, err) - - defer metaKv.Close() - defer metaKv.RemoveWithPrefix("") - - prepareKV := []struct { - inKey string - inValue string - }{ - {"a", "a_version1"}, - {"b", "b_version2"}, - {"a", "a_version3"}, - {"c", "c_version4"}, - {"a/suba", "a_version5"}, - } - - for _, test := range prepareKV { - err = metaKv.Save(test.inKey, test.inValue) - require.NoError(t, err) - } - - loadWithRevisionTests := []struct { - inKey string - - expectedKeyNo int - expectedValues []string - }{ - {"a", 2, []string{"a_version3", "a_version5"}}, - {"b", 1, []string{"b_version2"}}, - {"c", 1, []string{"c_version4"}}, - } - - for _, test := range loadWithRevisionTests { - keys, values, revision, err := metaKv.LoadWithRevision(test.inKey) - assert.NoError(t, err) - assert.Equal(t, test.expectedKeyNo, len(keys)) - assert.ElementsMatch(t, test.expectedValues, values) - assert.NotZero(t, revision) - } - - }) - te.Run("EtcdKV LoadBytesWithRevision", func(t *testing.T) { rootPath := "/etcd/test/root/LoadBytesWithRevision" _metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, ¶m.EtcdCfg) @@ -776,61 +716,6 @@ func TestEmbedEtcd(te *testing.T) { assert.True(t, resp.Created) }) - te.Run("Etcd Revision", func(t *testing.T) { - rootPath := "/etcd/test/root/watch" - metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, ¶m.EtcdCfg) - assert.Nil(t, err) - - defer metaKv.Close() - defer metaKv.RemoveWithPrefix("") - - revisionTests := []struct { - inKey string - fistValue string - secondValue string - }{ - {"a", "v1", "v11"}, - {"y", "v2", "v22"}, - {"z", "v3", "v33"}, - } - - for _, test := range revisionTests { - err = metaKv.Save(test.inKey, test.fistValue) - require.NoError(t, err) - - _, _, revision, _ := metaKv.LoadWithRevision(test.inKey) - ch := metaKv.WatchWithRevision(test.inKey, revision+1) - - err = metaKv.Save(test.inKey, test.secondValue) - require.NoError(t, err) - - resp := <-ch - assert.Equal(t, 1, len(resp.Events)) - assert.Equal(t, test.secondValue, string(resp.Events[0].Kv.Value)) - assert.Equal(t, revision+1, resp.Header.Revision) - } - - success, err := metaKv.CompareVersionAndSwap("a/b/c", 0, "1") - assert.NoError(t, err) - assert.True(t, success) - - value, err := metaKv.Load("a/b/c") - assert.NoError(t, err) - assert.Equal(t, value, "1") - - success, err = metaKv.CompareVersionAndSwap("a/b/c", 0, "1") - assert.NoError(t, err) - assert.False(t, success) - - success, err = metaKv.CompareValueAndSwap("a/b/c", "1", "2") - assert.True(t, success) - assert.NoError(t, err) - - success, err = metaKv.CompareValueAndSwap("a/b/c", "1", "2") - assert.NoError(t, err) - assert.False(t, success) - }) - te.Run("Etcd Revision Bytes", func(t *testing.T) { rootPath := "/etcd/test/root/revision_bytes" _metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, ¶m.EtcdCfg) @@ -877,86 +762,6 @@ func TestEmbedEtcd(te *testing.T) { success, err = metaKv.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1")) assert.NoError(t, err) assert.False(t, success) - - success, err = metaKv.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2")) - assert.True(t, success) - assert.NoError(t, err) - - success, err = metaKv.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2")) - assert.NoError(t, err) - assert.False(t, success) - - }) - - te.Run("Etcd Lease", func(t *testing.T) { - rootPath := "/etcd/test/root/lease" - metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, ¶m.EtcdCfg) - assert.Nil(t, err) - - defer metaKv.Close() - defer metaKv.RemoveWithPrefix("") - - leaseID, err := metaKv.Grant(10) - assert.NoError(t, err) - - metaKv.KeepAlive(leaseID) - - tests := map[string]string{ - "a/b": "v1", - "a/b/c": "v2", - "x": "v3", - } - - for k, v := range tests { - // SaveWithIgnoreLease must be used when the key already exists. - err = metaKv.SaveWithIgnoreLease(k, v) - assert.Error(t, err) - - err = metaKv.SaveWithLease(k, v, leaseID) - assert.NoError(t, err) - - err = metaKv.SaveWithLease(k, v, clientv3.LeaseID(999)) - assert.Error(t, err) - } - - }) - - te.Run("Etcd Lease Ignore", func(t *testing.T) { - rootPath := "/etcd/test/root/lease_ignore" - metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, ¶m.EtcdCfg) - assert.Nil(t, err) - - defer metaKv.Close() - defer metaKv.RemoveWithPrefix("") - - tests := map[string]string{ - "a/b": "v1", - "a/b/c": "v2", - "x": "v3", - } - - for k, v := range tests { - leaseID, err := metaKv.Grant(1) - assert.NoError(t, err) - - err = metaKv.SaveWithLease(k, v, leaseID) - assert.NoError(t, err) - - err = metaKv.SaveWithIgnoreLease(k, "updated_"+v) - assert.NoError(t, err) - - // Record should be updated correctly. - value, err := metaKv.Load(k) - assert.NoError(t, err) - assert.Equal(t, "updated_"+v, value) - - // Let the lease expire. 3 seconds should be pretty safe. - time.Sleep(3 * time.Second) - - // Updated record should still expire with lease. - _, err = metaKv.Load(k) - assert.Error(t, err) - } }) te.Run("Etcd WalkWithPagination", func(t *testing.T) { diff --git a/internal/kv/etcd/etcd_kv.go b/internal/kv/etcd/etcd_kv.go index 950318afcd..6cfc89d29d 100644 --- a/internal/kv/etcd/etcd_kv.go +++ b/internal/kv/etcd/etcd_kv.go @@ -140,51 +140,6 @@ func (kv *EtcdKV) LoadBytesWithPrefix(key string) ([]string, [][]byte, error) { return keys, values, nil } -// LoadWithPrefix2 returns all the the keys,values and key versions with the given key prefix. -func (kv *EtcdKV) LoadWithPrefix2(key string) ([]string, []string, []int64, error) { - start := time.Now() - key = path.Join(kv.rootPath, key) - ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) - defer cancel() - resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(), - clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) - if err != nil { - return nil, nil, nil, err - } - keys := make([]string, 0, resp.Count) - values := make([]string, 0, resp.Count) - versions := make([]int64, 0, resp.Count) - for _, kv := range resp.Kvs { - keys = append(keys, string(kv.Key)) - values = append(values, string(kv.Value)) - versions = append(versions, kv.Version) - } - CheckElapseAndWarn(start, "Slow etcd operation load with prefix2", zap.Strings("keys", keys)) - return keys, values, versions, nil -} - -func (kv *EtcdKV) LoadWithRevisionAndVersions(key string) ([]string, []string, []int64, int64, error) { - start := time.Now() - key = path.Join(kv.rootPath, key) - ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) - defer cancel() - resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(), - clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) - if err != nil { - return nil, nil, nil, 0, err - } - keys := make([]string, 0, resp.Count) - values := make([]string, 0, resp.Count) - versions := make([]int64, 0, resp.Count) - for _, kv := range resp.Kvs { - keys = append(keys, string(kv.Key)) - values = append(values, string(kv.Value)) - versions = append(versions, kv.Version) - } - CheckElapseAndWarn(start, "Slow etcd operation load with prefix2", zap.Strings("keys", keys)) - return keys, values, versions, resp.Header.Revision, nil -} - // LoadBytesWithPrefix2 returns all the the keys,values and key versions with the given key prefix. func (kv *EtcdKV) LoadBytesWithPrefix2(key string) ([]string, [][]byte, []int64, error) { start := time.Now() @@ -312,27 +267,6 @@ func (kv *EtcdKV) MultiLoadBytes(keys []string) ([][]byte, error) { return result, nil } -// LoadWithRevision returns keys, values and revision with given key prefix. -func (kv *EtcdKV) LoadWithRevision(key string) ([]string, []string, int64, error) { - start := time.Now() - key = path.Join(kv.rootPath, key) - ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) - defer cancel() - resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(), - clientv3.WithSort(clientv3.SortByCreateRevision, clientv3.SortAscend)) - if err != nil { - return nil, nil, 0, err - } - keys := make([]string, 0, resp.Count) - values := make([]string, 0, resp.Count) - for _, kv := range resp.Kvs { - keys = append(keys, string(kv.Key)) - values = append(values, string(kv.Value)) - } - CheckElapseAndWarn(start, "Slow etcd operation load with revision", zap.Strings("keys", keys)) - return keys, values, resp.Header.Revision, nil -} - // LoadBytesWithRevision returns keys, values and revision with given key prefix. func (kv *EtcdKV) LoadBytesWithRevision(key string) ([]string, [][]byte, int64, error) { start := time.Now() @@ -378,32 +312,6 @@ func (kv *EtcdKV) SaveBytes(key string, value []byte) error { return err } -// SaveWithLease is a function to put value in etcd with etcd lease options. -func (kv *EtcdKV) SaveWithLease(key, value string, id clientv3.LeaseID) error { - log.Debug("Etcd saving with lease", zap.String("etcd_key", key)) - start := time.Now() - key = path.Join(kv.rootPath, key) - ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) - defer cancel() - CheckValueSizeAndWarn(key, value) - _, err := kv.client.Put(ctx, key, value, clientv3.WithLease(id)) - CheckElapseAndWarn(start, "Slow etcd operation save with lease", zap.String("key", key)) - return err -} - -// SaveWithIgnoreLease updates the key without changing its current lease. Must be used when key already exists. -func (kv *EtcdKV) SaveWithIgnoreLease(key, value string) error { - log.Debug("Etcd saving with ignore lease", zap.String("etcd_key", key)) - start := time.Now() - key = path.Join(kv.rootPath, key) - ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) - defer cancel() - CheckValueSizeAndWarn(key, value) - _, err := kv.client.Put(ctx, key, value, clientv3.WithIgnoreLease()) - CheckElapseAndWarn(start, "Slow etcd operation save with lease", zap.String("key", key)) - return err -} - // SaveBytesWithLease is a function to put value in etcd with etcd lease options. func (kv *EtcdKV) SaveBytesWithLease(key string, value []byte, id clientv3.LeaseID) error { start := time.Now() @@ -669,64 +577,6 @@ func (kv *EtcdKV) MultiSaveBytesAndRemoveWithPrefix(saves map[string][]byte, rem return err } -// Grant creates a new lease implemented in etcd grant interface. -func (kv *EtcdKV) Grant(ttl int64) (id clientv3.LeaseID, err error) { - start := time.Now() - resp, err := kv.client.Grant(context.Background(), ttl) - CheckElapseAndWarn(start, "Slow etcd operation grant") - return resp.ID, err -} - -// KeepAlive keeps the lease alive forever with leaseID. -// Implemented in etcd interface. -func (kv *EtcdKV) KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliveResponse, error) { - start := time.Now() - ch, err := kv.client.KeepAlive(context.Background(), id) - if err != nil { - return nil, err - } - CheckElapseAndWarn(start, "Slow etcd operation keepAlive") - return ch, nil -} - -// CompareValueAndSwap compares the existing value with compare, and if they are -// equal, the target is stored in etcd. -func (kv *EtcdKV) CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) (bool, error) { - start := time.Now() - ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) - defer cancel() - resp, err := kv.client.Txn(ctx).If( - clientv3.Compare( - clientv3.Value(path.Join(kv.rootPath, key)), - "=", - value)). - Then(clientv3.OpPut(path.Join(kv.rootPath, key), target, opts...)).Commit() - if err != nil { - return false, err - } - CheckElapseAndWarn(start, "Slow etcd operation compare value and swap", zap.String("key", key)) - return resp.Succeeded, nil -} - -// CompareValueAndSwapBytes compares the existing value with compare, and if they are -// equal, the target is stored in etcd. -func (kv *EtcdKV) CompareValueAndSwapBytes(key string, value, target []byte, opts ...clientv3.OpOption) (bool, error) { - start := time.Now() - ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) - defer cancel() - resp, err := kv.client.Txn(ctx).If( - clientv3.Compare( - clientv3.Value(path.Join(kv.rootPath, key)), - "=", - string(value))). - Then(clientv3.OpPut(path.Join(kv.rootPath, key), string(target), opts...)).Commit() - if err != nil { - return false, err - } - CheckElapseAndWarn(start, "Slow etcd operation compare value and swap", zap.String("key", key)) - return resp.Succeeded, nil -} - // CompareVersionAndSwap compares the existing key-value's version with version, and if // they are equal, the target is stored in etcd. func (kv *EtcdKV) CompareVersionAndSwap(key string, source int64, target string, opts ...clientv3.OpOption) (bool, error) { diff --git a/internal/kv/etcd/etcd_kv_test.go b/internal/kv/etcd/etcd_kv_test.go index 0ccabe8f41..66f18ec016 100644 --- a/internal/kv/etcd/etcd_kv_test.go +++ b/internal/kv/etcd/etcd_kv_test.go @@ -26,7 +26,6 @@ import ( "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - clientv3 "go.etcd.io/etcd/client/v3" "golang.org/x/exp/maps" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" @@ -131,19 +130,6 @@ func TestEtcdKV_Load(te *testing.T) { assert.ElementsMatch(t, test.expectedKeys, actualKeys) assert.ElementsMatch(t, test.expectedValues, actualValues) assert.Equal(t, test.expectedError, err) - - actualKeys, actualValues, versions, err := etcdKV.LoadWithPrefix2(test.prefix) - assert.ElementsMatch(t, test.expectedKeys, actualKeys) - assert.ElementsMatch(t, test.expectedValues, actualValues) - assert.NotZero(t, versions) - assert.Equal(t, test.expectedError, err) - - actualKeys, actualValues, versions, revision, err := etcdKV.LoadWithRevisionAndVersions(test.prefix) - assert.ElementsMatch(t, test.expectedKeys, actualKeys) - assert.ElementsMatch(t, test.expectedValues, actualValues) - assert.NotZero(t, versions) - assert.NotZero(t, revision) - assert.Equal(t, test.expectedError, err) } removeTests := []struct { @@ -287,52 +273,8 @@ func TestEtcdKV_Load(te *testing.T) { } }) - te.Run("EtcdKV LoadWithRevision", func(t *testing.T) { - rootPath := "/etcd/test/root/LoadWithRevision" - etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) - - defer etcdKV.Close() - defer etcdKV.RemoveWithPrefix("") - - prepareKV := []struct { - inKey string - inValue string - }{ - {"a", "a_version1"}, - {"b", "b_version2"}, - {"a", "a_version3"}, - {"c", "c_version4"}, - {"a/suba", "a_version5"}, - } - - for _, test := range prepareKV { - err = etcdKV.Save(test.inKey, test.inValue) - require.NoError(t, err) - } - - loadWithRevisionTests := []struct { - inKey string - - expectedKeyNo int - expectedValues []string - }{ - {"a", 2, []string{"a_version3", "a_version5"}}, - {"b", 1, []string{"b_version2"}}, - {"c", 1, []string{"c_version4"}}, - } - - for _, test := range loadWithRevisionTests { - keys, values, revision, err := etcdKV.LoadWithRevision(test.inKey) - assert.NoError(t, err) - assert.Equal(t, test.expectedKeyNo, len(keys)) - assert.ElementsMatch(t, test.expectedValues, values) - assert.NotZero(t, revision) - } - - }) - te.Run("EtcdKV LoadBytesWithRevision", func(t *testing.T) { - rootPath := "/etcd/test/root/LoadWithRevision" + rootPath := "/etcd/test/root/LoadBytesWithRevision" etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) defer etcdKV.Close() @@ -711,59 +653,6 @@ func TestEtcdKV_Load(te *testing.T) { assert.True(t, resp.Created) }) - te.Run("Etcd Revision", func(t *testing.T) { - rootPath := "/etcd/test/root/revision" - etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) - defer etcdKV.Close() - defer etcdKV.RemoveWithPrefix("") - - revisionTests := []struct { - inKey string - fistValue string - secondValue string - }{ - {"a", "v1", "v11"}, - {"y", "v2", "v22"}, - {"z", "v3", "v33"}, - } - - for _, test := range revisionTests { - err = etcdKV.Save(test.inKey, test.fistValue) - require.NoError(t, err) - - _, _, revision, _ := etcdKV.LoadWithRevision(test.inKey) - ch := etcdKV.WatchWithRevision(test.inKey, revision+1) - - err = etcdKV.Save(test.inKey, test.secondValue) - require.NoError(t, err) - - resp := <-ch - assert.Equal(t, 1, len(resp.Events)) - assert.Equal(t, test.secondValue, string(resp.Events[0].Kv.Value)) - assert.Equal(t, revision+1, resp.Header.Revision) - } - - success, err := etcdKV.CompareVersionAndSwap("a/b/c", 0, "1") - assert.NoError(t, err) - assert.True(t, success) - - value, err := etcdKV.Load("a/b/c") - assert.NoError(t, err) - assert.Equal(t, value, "1") - - success, err = etcdKV.CompareVersionAndSwap("a/b/c", 0, "1") - assert.NoError(t, err) - assert.False(t, success) - - success, err = etcdKV.CompareValueAndSwap("a/b/c", "1", "2") - assert.True(t, success) - assert.NoError(t, err) - - success, err = etcdKV.CompareValueAndSwap("a/b/c", "1", "2") - assert.NoError(t, err) - assert.False(t, success) - }) - te.Run("Etcd Revision Bytes", func(t *testing.T) { rootPath := "/etcd/test/root/revision_bytes" etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) @@ -807,109 +696,6 @@ func TestEtcdKV_Load(te *testing.T) { success, err = etcdKV.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1")) assert.NoError(t, err) assert.False(t, success) - - success, err = etcdKV.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2")) - assert.True(t, success) - assert.NoError(t, err) - - success, err = etcdKV.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2")) - assert.NoError(t, err) - assert.False(t, success) - }) - - te.Run("Etcd Lease", func(t *testing.T) { - rootPath := "/etcd/test/root/lease" - etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) - - defer etcdKV.Close() - defer etcdKV.RemoveWithPrefix("") - - leaseID, err := etcdKV.Grant(10) - assert.NoError(t, err) - - etcdKV.KeepAlive(leaseID) - - tests := map[string]string{ - "a/b": "v1", - "a/b/c": "v2", - "x": "v3", - } - - for k, v := range tests { - // SaveWithIgnoreLease must be used when the key already exists. - err = etcdKV.SaveWithIgnoreLease(k, v) - assert.Error(t, err) - - err = etcdKV.SaveWithLease(k, v, leaseID) - assert.NoError(t, err) - - err = etcdKV.SaveWithLease(k, v, clientv3.LeaseID(999)) - assert.Error(t, err) - } - }) - - te.Run("Etcd Lease Ignore", func(t *testing.T) { - rootPath := "/etcd/test/root/lease_ignore" - etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) - - defer etcdKV.Close() - defer etcdKV.RemoveWithPrefix("") - - tests := map[string]string{ - "a/b": "v1", - "a/b/c": "v2", - "x": "v3", - } - - for k, v := range tests { - leaseID, err := etcdKV.Grant(1) - assert.NoError(t, err) - - err = etcdKV.SaveWithLease(k, v, leaseID) - assert.NoError(t, err) - - err = etcdKV.SaveWithIgnoreLease(k, "updated_"+v) - assert.NoError(t, err) - - // Record should be updated correctly. - value, err := etcdKV.Load(k) - assert.NoError(t, err) - assert.Equal(t, "updated_"+v, value) - - // Let the lease expire. 3 seconds should be pretty safe. - time.Sleep(3 * time.Second) - - // Updated record should still expire with lease. - _, err = etcdKV.Load(k) - assert.Error(t, err) - } - }) - - te.Run("Etcd Lease Bytes", func(t *testing.T) { - rootPath := "/etcd/test/root/lease_bytes" - etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) - - defer etcdKV.Close() - defer etcdKV.RemoveWithPrefix("") - - leaseID, err := etcdKV.Grant(10) - assert.NoError(t, err) - - etcdKV.KeepAlive(leaseID) - - tests := map[string][]byte{ - "a/b": []byte("v1"), - "a/b/c": []byte("v2"), - "x": []byte("v3"), - } - - for k, v := range tests { - err = etcdKV.SaveBytesWithLease(k, v, leaseID) - assert.NoError(t, err) - - err = etcdKV.SaveBytesWithLease(k, v, clientv3.LeaseID(999)) - assert.Error(t, err) - } }) } diff --git a/internal/kv/kv.go b/internal/kv/kv.go index 04875dd613..145f3c4d39 100644 --- a/internal/kv/kv.go +++ b/internal/kv/kv.go @@ -67,17 +67,9 @@ type MetaKv interface { TxnKV GetPath(key string) string LoadWithPrefix(key string) ([]string, []string, error) - LoadWithPrefix2(key string) ([]string, []string, []int64, error) - LoadWithRevisionAndVersions(key string) ([]string, []string, []int64, int64, error) - LoadWithRevision(key string) ([]string, []string, int64, error) Watch(key string) clientv3.WatchChan WatchWithPrefix(key string) clientv3.WatchChan WatchWithRevision(key string, revision int64) clientv3.WatchChan - SaveWithLease(key, value string, id clientv3.LeaseID) error - SaveWithIgnoreLease(key, value string) error - Grant(ttl int64) (id clientv3.LeaseID, err error) - KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliveResponse, error) - CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) (bool, error) CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) (bool, error) WalkWithPrefix(prefix string, paginationSize int, fn func([]byte, []byte) error) error } diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index d5e078b0f6..39617d52ce 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -45,7 +45,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/commonpb" grpcquerynodeclient "github.com/milvus-io/milvus/internal/distributed/querynode/client" - etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/querynodev2/cluster" "github.com/milvus-io/milvus/internal/querynodev2/delegator" "github.com/milvus-io/milvus/internal/querynodev2/pipeline" @@ -118,7 +117,6 @@ type QueryNode struct { cacheChunkManager storage.ChunkManager vectorStorage storage.ChunkManager - etcdKV *etcdkv.EtcdKV /* // Pool for search/query @@ -274,7 +272,6 @@ func (node *QueryNode) Init() error { return } - node.etcdKV = etcdkv.NewEtcdKV(node.etcdCli, paramtable.Get().EtcdCfg.MetaRootPath.GetValue()) log.Info("queryNode try to connect etcd success", zap.String("MetaRootPath", paramtable.Get().EtcdCfg.MetaRootPath.GetValue())) node.scheduler = tasks.NewScheduler()