diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index 4a81d869b5..4e72a7cec8 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -28,6 +28,8 @@ import ( "syscall" "time" + v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" + "github.com/golang/protobuf/proto" "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" @@ -859,8 +861,7 @@ func (i *IndexCoord) watchMetaLoop() { defer i.loopWg.Done() log.Debug("IndexCoord watchMetaLoop start") - watchChan := i.metaTable.client.WatchWithPrefix(indexFilePrefix) - + watchChan := i.metaTable.client.WatchWithRevision(indexFilePrefix, i.metaTable.revision) for { select { case <-ctx.Done(): @@ -871,6 +872,18 @@ func (i *IndexCoord) watchMetaLoop() { return } if err := resp.Err(); err != nil { + if err == v3rpc.ErrCompacted { + newMetaTable, err := NewMetaTable(i.metaTable.client) + if err != nil { + log.Error("Constructing new meta table fails when etcd has a compaction error", + zap.String("path", indexFilePrefix), zap.String("etcd error", err.Error()), zap.Error(err)) + panic("failed to handle etcd request, exit..") + } + i.metaTable = newMetaTable + i.loopWg.Add(1) + go i.watchMetaLoop() + return + } log.Error("received error event from etcd watcher", zap.String("path", indexFilePrefix), zap.Error(err)) panic("failed to handle etcd request, exit..") } diff --git a/internal/indexcoord/index_coord_test.go b/internal/indexcoord/index_coord_test.go index 2b3da6d26f..c62e48c5c4 100644 --- a/internal/indexcoord/index_coord_test.go +++ b/internal/indexcoord/index_coord_test.go @@ -18,6 +18,7 @@ package indexcoord import ( "context" + "fmt" "math/rand" "os" "os/signal" @@ -26,6 +27,9 @@ import ( "testing" "time" + "github.com/milvus-io/milvus/internal/kv" + clientv3 "go.etcd.io/etcd/client/v3" + "github.com/stretchr/testify/assert" "go.uber.org/zap" @@ -324,6 +328,102 @@ func TestIndexCoord_watchNodeLoop(t *testing.T) { assert.True(t, closed) } +type mockEtcdKv struct { + kv.MetaKv + + watchWithRevision func(string, int64) clientv3.WatchChan + loadWithRevisionAndVersions func(string) ([]string, []string, []int64, int64, error) +} + +func (mek *mockEtcdKv) WatchWithRevision(key string, revision int64) clientv3.WatchChan { + return mek.watchWithRevision(key, revision) +} + +func (mek *mockEtcdKv) LoadWithRevisionAndVersions(key string) ([]string, []string, []int64, int64, error) { + return mek.loadWithRevisionAndVersions(key) +} + +func TestIndexCoord_watchMetaLoop(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + ic := &IndexCoord{ + loopCtx: ctx, + loopWg: sync.WaitGroup{}, + } + + watchChan := make(chan clientv3.WatchResponse, 1024) + + client := &mockEtcdKv{ + watchWithRevision: func(s string, i int64) clientv3.WatchChan { + return watchChan + }, + } + mt := &metaTable{ + client: client, + indexBuildID2Meta: map[UniqueID]Meta{}, + revision: 0, + lock: sync.RWMutex{}, + } + ic.metaTable = mt + + t.Run("watch chan panic", func(t *testing.T) { + ic.loopWg.Add(1) + watchChan <- clientv3.WatchResponse{Canceled: true} + + assert.Panics(t, func() { + ic.watchMetaLoop() + }) + ic.loopWg.Wait() + }) + + t.Run("watch chan new meta table panic", func(t *testing.T) { + client = &mockEtcdKv{ + watchWithRevision: func(s string, i int64) clientv3.WatchChan { + return watchChan + }, + loadWithRevisionAndVersions: func(s string) ([]string, []string, []int64, int64, error) { + return []string{}, []string{}, []int64{}, 0, fmt.Errorf("error occurred") + }, + } + mt = &metaTable{ + client: client, + indexBuildID2Meta: map[UniqueID]Meta{}, + revision: 0, + lock: sync.RWMutex{}, + } + ic.metaTable = mt + ic.loopWg.Add(1) + watchChan <- clientv3.WatchResponse{CompactRevision: 10} + assert.Panics(t, func() { + ic.watchMetaLoop() + }) + ic.loopWg.Wait() + }) + + t.Run("watch chan new meta success", func(t *testing.T) { + ic.loopWg = sync.WaitGroup{} + client = &mockEtcdKv{ + watchWithRevision: func(s string, i int64) clientv3.WatchChan { + return watchChan + }, + loadWithRevisionAndVersions: func(s string) ([]string, []string, []int64, int64, error) { + return []string{}, []string{}, []int64{}, 0, nil + }, + } + mt = &metaTable{ + client: client, + indexBuildID2Meta: map[UniqueID]Meta{}, + revision: 0, + lock: sync.RWMutex{}, + } + ic.metaTable = mt + ic.loopWg.Add(1) + watchChan <- clientv3.WatchResponse{CompactRevision: 10} + go ic.watchMetaLoop() + cancel() + ic.loopWg.Wait() + }) +} + func TestIndexCoord_GetComponentStates(t *testing.T) { n := &IndexCoord{} n.stateCode.Store(internalpb.StateCode_Healthy) diff --git a/internal/indexcoord/meta_table.go b/internal/indexcoord/meta_table.go index 24c4aeb73c..718c1e6617 100644 --- a/internal/indexcoord/meta_table.go +++ b/internal/indexcoord/meta_table.go @@ -25,12 +25,13 @@ import ( "strconv" "sync" + "github.com/milvus-io/milvus/internal/kv" + "github.com/milvus-io/milvus/internal/metrics" "go.uber.org/zap" "github.com/golang/protobuf/proto" - etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/indexpb" @@ -47,14 +48,16 @@ type Meta struct { // metaTable records the mapping of IndexBuildID to Meta. type metaTable struct { - client *etcdkv.EtcdKV // client of a reliable kv service, i.e. etcd client + client kv.MetaKv // client of a reliable kv service, i.e. etcd client indexBuildID2Meta map[UniqueID]Meta // index build id to index meta + revision int64 + lock sync.RWMutex } // NewMetaTable is used to create a new meta table. -func NewMetaTable(kv *etcdkv.EtcdKV) (*metaTable, error) { +func NewMetaTable(kv kv.MetaKv) (*metaTable, error) { mt := &metaTable{ client: kv, lock: sync.RWMutex{}, @@ -73,11 +76,13 @@ func (mt *metaTable) reloadFromKV() error { key := indexFilePrefix log.Debug("IndexCoord metaTable LoadWithPrefix ", zap.String("prefix", key)) - _, values, versions, err := mt.client.LoadWithPrefix2(key) + _, values, versions, revision, err := mt.client.LoadWithRevisionAndVersions(key) if err != nil { return err } + mt.revision = revision + for i := 0; i < len(values); i++ { indexMeta := indexpb.IndexMeta{} err = proto.Unmarshal([]byte(values[i]), &indexMeta) diff --git a/internal/kv/etcd/embed_etcd_kv.go b/internal/kv/etcd/embed_etcd_kv.go index f1185ac8ca..08bfb112f8 100644 --- a/internal/kv/etcd/embed_etcd_kv.go +++ b/internal/kv/etcd/embed_etcd_kv.go @@ -147,6 +147,27 @@ func (kv *EmbedEtcdKV) LoadWithPrefix2(key string) ([]string, []string, []int64, return keys, values, versions, nil } +func (kv *EmbedEtcdKV) LoadWithRevisionAndVersions(key string) ([]string, []string, []int64, int64, error) { + key = path.Join(kv.rootPath, key) + log.Debug("LoadWithPrefix ", zap.String("prefix", key)) + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(), + clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) + if err != nil { + return nil, nil, nil, 0, err + } + keys := make([]string, 0, resp.Count) + values := make([]string, 0, resp.Count) + versions := make([]int64, 0, resp.Count) + for _, kv := range resp.Kvs { + keys = append(keys, string(kv.Key)) + values = append(values, string(kv.Value)) + versions = append(versions, kv.Version) + } + return keys, values, versions, resp.Header.Revision, nil +} + // LoadBytesWithPrefix2 returns all the keys and values with versions by the given key prefix func (kv *EmbedEtcdKV) LoadBytesWithPrefix2(key string) ([]string, [][]byte, []int64, error) { key = path.Join(kv.rootPath, key) diff --git a/internal/kv/etcd/embed_etcd_kv_test.go b/internal/kv/etcd/embed_etcd_kv_test.go index 3833456b8f..705a4e9b6e 100644 --- a/internal/kv/etcd/embed_etcd_kv_test.go +++ b/internal/kv/etcd/embed_etcd_kv_test.go @@ -128,6 +128,13 @@ func TestEmbedEtcd(te *testing.T) { assert.ElementsMatch(t, test.expectedValues, actualValues) assert.NotZero(t, versions) assert.Equal(t, test.expectedError, err) + + actualKeys, actualValues, versions, revision, err := metaKv.LoadWithRevisionAndVersions(test.prefix) + assert.ElementsMatch(t, test.expectedKeys, actualKeys) + assert.ElementsMatch(t, test.expectedValues, actualValues) + assert.NotZero(t, versions) + assert.NotZero(t, revision) + assert.Equal(t, test.expectedError, err) } removeTests := []struct { diff --git a/internal/kv/etcd/etcd_kv.go b/internal/kv/etcd/etcd_kv.go index 53a08bc628..e7abd0ef74 100644 --- a/internal/kv/etcd/etcd_kv.go +++ b/internal/kv/etcd/etcd_kv.go @@ -124,6 +124,28 @@ func (kv *EtcdKV) LoadWithPrefix2(key string) ([]string, []string, []int64, erro return keys, values, versions, nil } +func (kv *EtcdKV) LoadWithRevisionAndVersions(key string) ([]string, []string, []int64, int64, error) { + start := time.Now() + key = path.Join(kv.rootPath, key) + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(), + clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) + if err != nil { + return nil, nil, nil, 0, err + } + keys := make([]string, 0, resp.Count) + values := make([]string, 0, resp.Count) + versions := make([]int64, 0, resp.Count) + for _, kv := range resp.Kvs { + keys = append(keys, string(kv.Key)) + values = append(values, string(kv.Value)) + versions = append(versions, kv.Version) + } + CheckElapseAndWarn(start, "Slow etcd operation load with prefix2") + return keys, values, versions, resp.Header.Revision, nil +} + // LoadBytesWithPrefix2 returns all the the keys,values and key versions with the given key prefix. func (kv *EtcdKV) LoadBytesWithPrefix2(key string) ([]string, [][]byte, []int64, error) { start := time.Now() diff --git a/internal/kv/etcd/etcd_kv_test.go b/internal/kv/etcd/etcd_kv_test.go index e91933e530..7e939ab5b3 100644 --- a/internal/kv/etcd/etcd_kv_test.go +++ b/internal/kv/etcd/etcd_kv_test.go @@ -126,6 +126,13 @@ func TestEtcdKV_Load(te *testing.T) { assert.ElementsMatch(t, test.expectedValues, actualValues) assert.NotZero(t, versions) assert.Equal(t, test.expectedError, err) + + actualKeys, actualValues, versions, revision, err := etcdKV.LoadWithRevisionAndVersions(test.prefix) + assert.ElementsMatch(t, test.expectedKeys, actualKeys) + assert.ElementsMatch(t, test.expectedValues, actualValues) + assert.NotZero(t, versions) + assert.NotZero(t, revision) + assert.Equal(t, test.expectedError, err) } removeTests := []struct { diff --git a/internal/kv/kv.go b/internal/kv/kv.go index de2f6cd312..524ffc1a54 100644 --- a/internal/kv/kv.go +++ b/internal/kv/kv.go @@ -64,6 +64,7 @@ type MetaKv interface { GetPath(key string) string LoadWithPrefix(key string) ([]string, []string, error) LoadWithPrefix2(key string) ([]string, []string, []int64, error) + LoadWithRevisionAndVersions(key string) ([]string, []string, []int64, int64, error) LoadWithRevision(key string) ([]string, []string, int64, error) Watch(key string) clientv3.WatchChan WatchWithPrefix(key string) clientv3.WatchChan diff --git a/internal/kv/mock_kv.go b/internal/kv/mock_kv.go index e0e4ebad1b..505e1ddbd0 100644 --- a/internal/kv/mock_kv.go +++ b/internal/kv/mock_kv.go @@ -107,6 +107,10 @@ func (m *MockMetaKV) LoadWithPrefix2(key string) ([]string, []string, []int64, e panic("not implemented") // TODO: Implement } +func (m *MockMetaKV) LoadWithRevisionAndVersions(key string) ([]string, []string, []int64, int64, error) { + panic("not implemented") // TODO: Implement +} + func (m *MockMetaKV) LoadWithRevision(key string) ([]string, []string, int64, error) { panic("not implemented") // TODO: Implement } diff --git a/internal/kv/mock_kv_test.go b/internal/kv/mock_kv_test.go index ebd8bc8745..a8cc00d9ad 100644 --- a/internal/kv/mock_kv_test.go +++ b/internal/kv/mock_kv_test.go @@ -90,6 +90,10 @@ func TestMockKV_MetaKV(t *testing.T) { mockKv.LoadWithPrefix2(testKey) }) + assert.Panics(t, func() { + mockKv.LoadWithRevisionAndVersions(testKey) + }) + assert.Panics(t, func() { mockKv.LoadWithRevision(testKey) }) diff --git a/internal/querycoord/replica_test.go b/internal/querycoord/replica_test.go index 59c3acd7e5..49ff333101 100644 --- a/internal/querycoord/replica_test.go +++ b/internal/querycoord/replica_test.go @@ -89,6 +89,10 @@ func (m *mockMetaKV) LoadWithPrefix2(key string) ([]string, []string, []int64, e panic("not implemented") // TODO: Implement } +func (m *mockMetaKV) LoadWithRevisionAndVersions(key string) ([]string, []string, []int64, int64, error) { + panic("not implemented") // TODO: Implement +} + func (m *mockMetaKV) LoadWithRevision(key string) ([]string, []string, int64, error) { panic("not implemented") // TODO: Implement }