diff --git a/docs/developer_guides/appendix_a_basic_components.md b/docs/developer_guides/appendix_a_basic_components.md index f073738ace..ee172a701d 100644 --- a/docs/developer_guides/appendix_a_basic_components.md +++ b/docs/developer_guides/appendix_a_basic_components.md @@ -86,9 +86,12 @@ The ID is stored in a key-value pair on etcd. The key is metaRootPath + "/servic ###### Interface ```go -default_ttl = 10 -default_retry_times = 3 +const defaultIDKey = "services/id" +const defaultRetryTimes = 30 +// Session is a struct to store service's session, including ServerID, ServerName, +// Address. +// LeaseID will be assigned after registered in etcd. type Session struct { ServerID int64 ServerName string @@ -96,9 +99,26 @@ type Session struct { LeaseID clientv3.LeaseID } -// GetServerID gets id from etcd with key: metaRootPath + "/services/ServerID" -// Each server get ServerID and add one to ServerID -GetServerID(){} +var ( + globalServerID = int64(-1) +) + +// NewSession is a helper to build Session object.LeaseID will be assigned after +// registeration. +func NewSession(serverID int64, serverName, address string) *Session {} + +// GlobalServerID returns [singleton] ServerID. +// Before SetGlobalServerID, GlobalServerID() returns -1 +func GlobalServerID() int64 {} + +// SetGlobalServerID sets the [singleton] ServerID. ServerID returned by +// GlobalServerID(). Those who use GlobalServerID should call SetGlobalServerID() +// as early as possible in main() before use ServerID. +func SetGlobalServerID(id int64) {} + +// GetServerID gets id from etcd with key: metaRootPath + "/services/id" +// Each server get ServerID and add one to id. +func GetServerID(etcd *etcdkv.EtcdKV) (int64, error) {} // RegisterService registers the service to etcd so that other services // can find that the service is online and issue subsequent operations @@ -107,25 +127,27 @@ GetServerID(){} // value: json format // { // "ServerID": ServerID -// "ServerName": ServerName-ServerID // ServerName +// "ServerName": ServerName // ServerName // "Address": ip:port // Address of service, including ip and port // "LeaseID": LeaseID // The ID of etcd lease // } // MetaRootPath is configurable in the config file. -RegisterService(etcdKV etcdKV, serverName string, address string)(<-chan *clientv3.LeaseKeepAliveResponse, error) - - +func RegisterService(etcdKV *etcdkv.EtcdKV, session *Session, ttl int64) (<-chan *clientv3.LeaseKeepAliveResponse, error) {} // ProcessKeepAliveResponse processes the response of etcd keepAlive interface // If keepAlive fails for unexpected error, it will retry for default_retry_times times -ProcessKeepAliveResponse(ctx context, ch <-chan *clientv3.LeaseKeepAliveResponse) - +func ProcessKeepAliveResponse(ctx context.Context, ch <-chan *clientv3.LeaseKeepAliveResponse) (signal <-chan bool) {} // GetAllSessions gets all the services registered in etcd. // This gets all the key with prefix metaRootPath + "/services/" + prefix // For general, "datanode" to get all datanodes -GetSessions(prefix string) ([]session, error) +func GetSessions(etcdKV *etcdkv.EtcdKV, prefix string) ([]*Session, error) {} + +// WatchServices watch all events in etcd. +// If a server register, a session will be sent to addChannel +// If a server offline, a session will be sent to deleteChannel +func WatchServices(ctx context.Context, etcdKV *etcdkv.EtcdKV, prefix string) (addChannel <-chan *Session, deleteChannel <-chan *Session) {} ``` diff --git a/go.mod b/go.mod index 121105ae0e..b1db075aa9 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/apache/pulsar-client-go v0.4.0 github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6 // indirect github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b // indirect + github.com/coreos/etcd v3.3.13+incompatible // indirect github.com/datadog/zstd v1.4.6-0.20200617134701-89f69fb7df32 // indirect github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c // indirect github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect diff --git a/internal/kv/etcd/etcd_kv.go b/internal/kv/etcd/etcd_kv.go index 9d8e579591..5193c848bf 100644 --- a/internal/kv/etcd/etcd_kv.go +++ b/internal/kv/etcd/etcd_kv.go @@ -54,7 +54,8 @@ func (kv *EtcdKV) LoadWithPrefix(key string) ([]string, []string, error) { log.Debug("LoadWithPrefix ", zap.String("prefix", key)) ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() - resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) + resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(), + clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) if err != nil { return nil, nil, err } @@ -103,12 +104,14 @@ func (kv *EtcdKV) MultiLoad(keys []string) ([]string, error) { result = append(result, "") } for _, ev := range rp.GetResponseRange().Kvs { - log.Debug("MultiLoad", zap.ByteString("key", ev.Key), zap.ByteString("value", ev.Value)) + log.Debug("MultiLoad", zap.ByteString("key", ev.Key), + zap.ByteString("value", ev.Value)) result = append(result, string(ev.Value)) } } if len(invalid) != 0 { - log.Debug("MultiLoad: there are invalid keys", zap.Strings("keys", invalid)) + log.Debug("MultiLoad: there are invalid keys", + zap.Strings("keys", invalid)) err = fmt.Errorf("there are invalid keys: %s", invalid) return result, err } @@ -253,3 +256,45 @@ func (kv *EtcdKV) KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliv } return ch, nil } + +// CompareValueAndSwap compares the existing value with compare, and if they are +// equal, the target is stored in etcd. +func (kv *EtcdKV) CompareValueAndSwap(key, value, target string) error { + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + resp, err := kv.client.Txn(ctx).If( + clientv3.Compare( + clientv3.Value(path.Join(kv.rootPath, key)), + "=", + value)). + Then(clientv3.OpPut(path.Join(kv.rootPath, key), target)).Commit() + if err != nil { + return err + } + if !resp.Succeeded { + return fmt.Errorf("function CompareAndSwap error for compare is false for key: %s", key) + } + + return nil +} + +// CompareVersionAndSwap compares the existing key-value's version with version, and if +// they are equal, the target is stored in etcd. +func (kv *EtcdKV) CompareVersionAndSwap(key string, version int64, target string) error { + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + resp, err := kv.client.Txn(ctx).If( + clientv3.Compare( + clientv3.Version(path.Join(kv.rootPath, key)), + "=", + version)). + Then(clientv3.OpPut(path.Join(kv.rootPath, key), target)).Commit() + if err != nil { + return err + } + if !resp.Succeeded { + return fmt.Errorf("function CompareAndSwap error for compare is false for key: %s", key) + } + + return nil +} diff --git a/internal/kv/etcd/etcd_kv_test.go b/internal/kv/etcd/etcd_kv_test.go index 3494d3fd32..b40dc87a9d 100644 --- a/internal/kv/etcd/etcd_kv_test.go +++ b/internal/kv/etcd/etcd_kv_test.go @@ -341,3 +341,30 @@ func TestEtcdKV_WatchPrefix(t *testing.T) { assert.True(t, resp.Created) } + +func TestEtcdKV_CompareAndSwap(t *testing.T) { + etcdAddr, err := Params.Load("_EtcdAddress") + if err != nil { + panic(err) + } + + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + assert.Nil(t, err) + rootPath := "/etcd/test/root" + etcdKV := etcdkv.NewEtcdKV(cli, rootPath) + + defer etcdKV.Close() + defer etcdKV.RemoveWithPrefix("") + + err = etcdKV.CompareVersionAndSwap("a/b/c", 0, "1") + assert.Nil(t, err) + value, err := etcdKV.Load("a/b/c") + assert.Nil(t, err) + assert.Equal(t, value, "1") + err = etcdKV.CompareVersionAndSwap("a/b/c", 0, "1") + assert.NotNil(t, err) + err = etcdKV.CompareValueAndSwap("a/b/c", "1", "2") + assert.Nil(t, err) + err = etcdKV.CompareValueAndSwap("a/b/c", "1", "2") + assert.NotNil(t, err) +} diff --git a/internal/util/session/session_util.go b/internal/util/session/session_util.go new file mode 100644 index 0000000000..6c771a3b56 --- /dev/null +++ b/internal/util/session/session_util.go @@ -0,0 +1,222 @@ +package session + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strconv" + "time" + + "github.com/coreos/etcd/mvcc/mvccpb" + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/util/retry" + "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" +) + +const defaultIDKey = "services/id" +const defaultRetryTimes = 30 + +// Session is a struct to store service's session, including ServerID, ServerName, +// Address. +// LeaseID will be assigned after registered in etcd. +type Session struct { + ServerID int64 + ServerName string + Address string + LeaseID clientv3.LeaseID +} + +var ( + globalServerID = int64(-1) +) + +// NewSession is a helper to build Session object.LeaseID will be assigned after +// registeration. +func NewSession(serverID int64, serverName, address string) *Session { + return &Session{ + ServerID: serverID, + ServerName: serverName, + Address: address, + } +} + +// GlobalServerID returns [singleton] ServerID. +// Before SetGlobalServerID, GlobalServerID() returns -1 +func GlobalServerID() int64 { + return globalServerID +} + +// SetGlobalServerID sets the [singleton] ServerID. ServerID returned by +// GlobalServerID(). Those who use GlobalServerID should call SetGlobalServerID() +// as early as possible in main() before use ServerID. +func SetGlobalServerID(id int64) { + globalServerID = id +} + +// GetServerID gets id from etcd with key: metaRootPath + "/services/id" +// Each server get ServerID and add one to id. +func GetServerID(etcd *etcdkv.EtcdKV) (int64, error) { + return getServerIDWithKey(etcd, defaultIDKey, defaultRetryTimes) +} + +func getServerIDWithKey(etcd *etcdkv.EtcdKV, key string, retryTimes int) (int64, error) { + res := int64(-1) + getServerIDWithKeyFn := func() error { + value, err := etcd.Load(key) + log.Debug("session", zap.String("get serverid", value)) + if err != nil { + err = etcd.CompareVersionAndSwap(key, 0, "1") + if err != nil { + log.Debug("session", zap.Error(err)) + return err + } + res = 0 + return nil + } + valueInt, err := strconv.ParseInt(value, 10, 64) + if err != nil { + log.Debug("session", zap.Error(err)) + return err + } + err = etcd.CompareValueAndSwap(key, value, + strconv.FormatInt(valueInt+1, 10)) + if err != nil { + log.Debug("session", zap.Error(err)) + return err + } + res = valueInt + return nil + } + + err := retry.Retry(retryTimes, time.Millisecond*200, getServerIDWithKeyFn) + return res, err +} + +// RegisterService registers the service to etcd so that other services +// can find that the service is online and issue subsequent operations +// RegisterService will save a key-value in etcd +// key: metaRootPath + "/services" + "/ServerName-ServerID" +// value: json format +// { +// "ServerID": ServerID +// "ServerName": ServerName // ServerName +// "Address": ip:port // Address of service, including ip and port +// "LeaseID": LeaseID // The ID of etcd lease +// } +// MetaRootPath is configurable in the config file. +func RegisterService(etcdKV *etcdkv.EtcdKV, session *Session, ttl int64) (<-chan *clientv3.LeaseKeepAliveResponse, error) { + respID, err := etcdKV.Grant(ttl) + if err != nil { + log.Error("register service", zap.Error(err)) + return nil, err + } + session.LeaseID = respID + + sessionJSON, err := json.Marshal(session) + if err != nil { + return nil, err + } + + err = etcdKV.SaveWithLease(fmt.Sprintf("/services/%s-%d", session.ServerName, session.ServerID), + string(sessionJSON), respID) + if err != nil { + fmt.Printf("put lease error %s\n", err) + return nil, err + } + + ch, err := etcdKV.KeepAlive(respID) + if err != nil { + fmt.Printf("keep alive error %s\n", err) + return nil, err + } + return ch, nil +} + +// ProcessKeepAliveResponse processes the response of etcd keepAlive interface +// If keepAlive fails for unexpected error, it will send a signal to the channel. +func ProcessKeepAliveResponse(ctx context.Context, ch <-chan *clientv3.LeaseKeepAliveResponse) (signal <-chan bool) { + signalOut := make(chan bool) + go func() { + for { + select { + case <-ctx.Done(): + log.Error("keep alive", zap.Error(errors.New("context done"))) + return + case resp, ok := <-ch: + if !ok { + signalOut <- false + } + if resp != nil { + signalOut <- true + } else { + signalOut <- false + } + } + } + }() + return signalOut +} + +// GetSessions gets all the services registered in etcd. +// This gets all the key with prefix metaRootPath + "/services/" + prefix +// For general, "datanode" to get all datanodes +func GetSessions(etcdKV *etcdkv.EtcdKV, prefix string) ([]*Session, error) { + sessions := make([]*Session, 0) + _, resValue, err := etcdKV.LoadWithPrefix("/services/" + prefix) + if err != nil { + return nil, err + } + for _, value := range resValue { + session := &Session{} + err = json.Unmarshal([]byte(value), session) + if err != nil { + return nil, err + } + sessions = append(sessions, session) + } + return sessions, nil +} + +// WatchServices watch all events in etcd. +// If a server register, a session will be sent to addChannel +// If a server offline, a session will be sent to deleteChannel +func WatchServices(ctx context.Context, etcdKV *etcdkv.EtcdKV, prefix string) (addChannel <-chan *Session, deleteChannel <-chan *Session) { + addCh := make(chan *Session, 10) + deleteCh := make(chan *Session, 10) + rch := etcdKV.WatchWithPrefix("/services/" + prefix) + go func() { + for { + select { + case <-ctx.Done(): + return + case wresp, ok := <-rch: + if !ok { + return + } + for _, ev := range wresp.Events { + session := &Session{} + err := json.Unmarshal([]byte(ev.Kv.Value), session) + if err != nil { + log.Error("watch services", zap.Error(err)) + continue + } + switch ev.Type { + case mvccpb.PUT: + log.Debug("watch services", + zap.Any("addchannel kv", ev.Kv)) + addCh <- session + case mvccpb.DELETE: + log.Debug("watch services", + zap.Any("deletechannel kv", ev.Kv)) + deleteCh <- session + } + } + + } + } + }() + return addCh, deleteCh +} diff --git a/internal/util/session/session_util_test.go b/internal/util/session/session_util_test.go new file mode 100644 index 0000000000..bbb1759bca --- /dev/null +++ b/internal/util/session/session_util_test.go @@ -0,0 +1,131 @@ +package session + +import ( + "fmt" + "sync" + "testing" + + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/util/paramtable" + "github.com/stretchr/testify/assert" + "go.etcd.io/etcd/clientv3" + "golang.org/x/net/context" +) + +var Params paramtable.BaseTable + +func TestGetServerID(t *testing.T) { + Params.Init() + + etcdAddr, err := Params.Load("_EtcdAddress") + if err != nil { + panic(err) + } + + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + assert.Nil(t, err) + rootPath := "/etcd/test/root" + etcdKV := etcdkv.NewEtcdKV(cli, rootPath) + + defer etcdKV.Close() + defer etcdKV.RemoveWithPrefix("") + + var wg sync.WaitGroup + var muList sync.Mutex = sync.Mutex{} + + res := make([]int64, 0) + + getIDFunc := func() { + id, err := GetServerID(etcdKV) + assert.Nil(t, err) + muList.Lock() + res = append(res, id) + muList.Unlock() + wg.Done() + } + + for i := 0; i < 10; i++ { + wg.Add(1) + go getIDFunc() + } + wg.Wait() + for i := 0; i < 10; i++ { + assert.Contains(t, res, int64(i)) + } + +} + +func TestRegister(t *testing.T) { + Params.Init() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + etcdAddr, err := Params.Load("_EtcdAddress") + if err != nil { + panic(err) + } + + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + assert.Nil(t, err) + rootPath := "/etcd/test/root" + etcdKV := etcdkv.NewEtcdKV(cli, rootPath) + + defer etcdKV.Close() + defer etcdKV.RemoveWithPrefix("") + + addChannel, deletechannel := WatchServices(ctx, etcdKV, "test") + for i := 0; i < 10; i++ { + id, err := GetServerID(etcdKV) + assert.Nil(t, err) + session := NewSession(id, "test", "localhost") + _, err = RegisterService(etcdKV, session, 10) + assert.Nil(t, err) + sessionReturn := <-addChannel + assert.Equal(t, sessionReturn, session) + } + + sessions, err := GetSessions(etcdKV, "test") + assert.Nil(t, err) + assert.Equal(t, len(sessions), 10) + for i := 10; i < 10; i++ { + assert.Equal(t, sessions[i].ServerID, int64(i)) + err = etcdKV.Remove(fmt.Sprintf("test-%d", i)) + assert.Nil(t, err) + sessionReturn := <-deletechannel + assert.Equal(t, sessionReturn, sessions[i]) + } +} + +func TestKeepAlive(t *testing.T) { + Params.Init() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + etcdAddr, err := Params.Load("_EtcdAddress") + if err != nil { + panic(err) + } + + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + assert.Nil(t, err) + rootPath := "/etcd/test/root" + etcdKV := etcdkv.NewEtcdKV(cli, rootPath) + + defer etcdKV.Close() + defer etcdKV.RemoveWithPrefix("") + + id, err := GetServerID(etcdKV) + assert.Nil(t, err) + session := NewSession(id, "test", "localhost") + ch, err := RegisterService(etcdKV, session, 10) + assert.Nil(t, err) + aliveCh := ProcessKeepAliveResponse(ctx, ch) + + signal := <-aliveCh + assert.Equal(t, signal, true) + + sessions, err := GetSessions(etcdKV, "test") + assert.Nil(t, err) + assert.Equal(t, len(sessions), 1) + assert.Equal(t, sessions[0].ServerID, int64(0)) +} diff --git a/scripts/run_go_unittest.sh b/scripts/run_go_unittest.sh index 8af8e2af29..8b9a83e2ba 100755 --- a/scripts/run_go_unittest.sh +++ b/scripts/run_go_unittest.sh @@ -14,6 +14,7 @@ ROOT_DIR="$( cd -P "$( dirname "$SOURCE" )/.." && pwd )" MILVUS_DIR="${ROOT_DIR}/internal/" echo $MILVUS_DIR +go test -race -cover "${MILVUS_DIR}/util/session/..." -failfast go test -race -cover "${MILVUS_DIR}/kv/..." "${MILVUS_DIR}/tso/..." "${MILVUS_DIR}/allocator/..." -failfast # TODO: remove to distributed #go test -race -cover "${MILVUS_DIR}/proxynode/..." -failfast