From 1e3fc5076ffe99680b2e4419a65b7d913062298f Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Mon, 1 Nov 2021 10:19:55 +0800 Subject: [PATCH] Change etcdkv clientv3 into MetaKv interface (#10903) This pr: - changed etcdkv clientv3 into MetaKv interface - replaced fmt.Sprintf with path.Join for kv key See also: #8058 Signed-off-by: yangxuan --- internal/datanode/data_node.go | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index aac980308e..dac59ca475 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -25,6 +25,7 @@ import ( "fmt" "io" "math/rand" + "path" "strconv" "strings" "sync" @@ -37,6 +38,7 @@ import ( "go.uber.org/zap" "github.com/golang/protobuf/proto" + "github.com/milvus-io/milvus/internal/kv" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/logutil" @@ -105,8 +107,8 @@ type DataNode struct { rootCoord types.RootCoord dataCoord types.DataCoord - session *sessionutil.Session - kvClient *etcdkv.EtcdKV + session *sessionutil.Session + watchKv kv.MetaKv closer io.Closer @@ -189,7 +191,7 @@ func (node *DataNode) Register() error { return nil } -// Init function do nothing now. +// Init function does nothing now. func (node *DataNode) Init() error { log.Debug("DataNode Init", zap.String("SegmentStatisticsChannelName", Params.SegmentStatisticsChannelName), @@ -203,8 +205,8 @@ func (node *DataNode) Init() error { func (node *DataNode) StartWatchChannels(ctx context.Context) { defer logutil.LogPanic() // REF MEP#7 watch path should be [prefix]/channel/{node_id}/{channel_name} - watchPrefix := fmt.Sprintf("%s/%d", Params.ChannelWatchSubPath, node.NodeID) - evtChan := node.kvClient.WatchWithPrefix(watchPrefix) + watchPrefix := path.Join(Params.ChannelWatchSubPath, fmt.Sprintf("%d", node.NodeID)) + evtChan := node.watchKv.WatchWithPrefix(watchPrefix) // after watch, first check all exists nodes first err := node.checkWatchedList() if err != nil { @@ -243,8 +245,8 @@ func (node *DataNode) StartWatchChannels(ctx context.Context) { // serves the corner case for etcd connection lost and missing some events func (node *DataNode) checkWatchedList() error { // REF MEP#7 watch path should be [prefix]/channel/{node_id}/{channel_name} - prefix := fmt.Sprintf("%s/%d", Params.ChannelWatchSubPath, node.NodeID) - keys, values, err := node.kvClient.LoadWithPrefix(prefix) + prefix := path.Join(Params.ChannelWatchSubPath, fmt.Sprintf("%d", node.NodeID)) + keys, values, err := node.watchKv.LoadWithPrefix(prefix) if err != nil { return err } @@ -291,7 +293,8 @@ func (node *DataNode) handleWatchInfo(key string, data []byte) { log.Warn("fail to Marshal watchInfo", zap.String("key", key), zap.Error(err)) return } - err = node.kvClient.Save(fmt.Sprintf("%s/%d/%s", Params.ChannelWatchSubPath, node.NodeID, watchInfo.Vchan.ChannelName), string(v)) + k := path.Join(Params.ChannelWatchSubPath, fmt.Sprintf("%d", node.NodeID), watchInfo.GetVchan().GetChannelName()) + err = node.watchKv.Save(k, string(v)) if err != nil { log.Warn("fail to change WatchState to complete", zap.String("key", key), zap.Error(err)) node.ReleaseDataSyncService(key) @@ -395,7 +398,7 @@ func (node *DataNode) Start() error { if err != nil { return err } - node.kvClient = etcdKV + node.watchKv = etcdKV return nil } err = retry.Do(node.ctx, connectEtcdFn, retry.Attempts(ConnectEtcdMaxRetryTime))