rename service to coord (#6020)

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
This commit is contained in:
Cai Yudong 2021-06-23 16:14:08 +08:00 committed by GitHub
parent 9ebca59099
commit a09a3a1905
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 93 additions and 94 deletions

2
.gitignore vendored
View File

@ -23,7 +23,7 @@ cmake_build
# Docker generated cache file # Docker generated cache file
.docker/ .docker/
# proxynode # proxy
proxy/milvus proxy/milvus
proxy/cmake_build proxy/cmake_build
proxy/cmake-build-debug proxy/cmake-build-debug

View File

@ -12,8 +12,8 @@
msgChannel: msgChannel:
# channel name generation rule: ${namePrefix}-${ChannelIdx} # channel name generation rule: ${namePrefix}-${ChannelIdx}
chanNamePrefix: chanNamePrefix:
masterTimeTick: "master-timetick" rootCoordTimeTick: "rootcoord-timetick"
masterStatistics: "master-statistics" rootCoordStatistics: "rootcoord-statistics"
search: "search" search: "search"
searchResult: "searchResult" searchResult: "searchResult"
proxyTimeTick: "proxyTimeTick" proxyTimeTick: "proxyTimeTick"
@ -28,7 +28,7 @@ msgChannel:
# sub name generation rule: ${subNamePrefix}-${NodeID} # sub name generation rule: ${subNamePrefix}-${NodeID}
subNamePrefix: subNamePrefix:
masterSubNamePrefix: "master" rootCoordSubNamePrefix: "rootcoord"
proxySubNamePrefix: "proxy" proxySubNamePrefix: "proxy"
queryNodeSubNamePrefix: "queryNode" queryNodeSubNamePrefix: "queryNode"
dataNodeSubNamePrefix: "dataNode" dataNodeSubNamePrefix: "dataNode"

View File

@ -9,7 +9,7 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under the License. # or implied. See the License for the specific language governing permissions and limitations under the License.
master: rootcoord:
maxPartitionNum: 4096 maxPartitionNum: 4096
minSegmentSizeToEnableIndex: 1024 minSegmentSizeToEnableIndex: 1024
timeout: 3600 # time out, 5 seconds timeout: 3600 # time out, 5 seconds

View File

@ -17,8 +17,8 @@ can appear in anywhere in vchannel.**
## What does DataNode recovery really mean? ## What does DataNode recovery really mean?
DataNode is stateless, but vchannel has states. DataNode's statelessness is guranteed by DataService, which DataNode is stateless, but vchannel has states. DataNode's statelessness is guranteed by DataCoord, which
means the vchannel's states is maintained by DataService. So DataNode recovery has no different as starting. means the vchannel's states is maintained by DataCoord. So DataNode recovery has no different as starting.
So what's DataNode's starting procedure? So what's DataNode's starting procedure?
@ -30,7 +30,7 @@ DataNode registers itself to Etcd after grpc server started, in *INITIALIZING* s
### 2. Service Discovery ### 2. Service Discovery
DataNode discovers DataService and MasterService, in *HEALTHY* and *IDLE* state. DataNode discovers DataCoord and RootCoord, in *HEALTHY* and *IDLE* state.
### 3. Flowgraph Recovery ### 3. Flowgraph Recovery
@ -41,10 +41,10 @@ After DataNode subscribes to a stateful vchannel, DataNode starts to work, or mo
Vchannel is stateful because we don't want to process twice what's already processed. And a "processed" message means its Vchannel is stateful because we don't want to process twice what's already processed. And a "processed" message means its
already persistant. In DataNode's terminology, a message is processed if it's been flushed. already persistant. In DataNode's terminology, a message is processed if it's been flushed.
DataService tells DataNode stateful vchannel infos through RPC `WatchDmChannels`, so that DataNode won't process DataCoord tells DataNode stateful vchannel infos through RPC `WatchDmChannels`, so that DataNode won't process
the same messages over and over again. So flowgraph needs ability to consume messages in the middle of a vchannel. the same messages over and over again. So flowgraph needs ability to consume messages in the middle of a vchannel.
DataNode tells DataService vchannel states after each flush through RPC `SaveBinlogPaths`, so that DataService DataNode tells DataCoord vchannel states after each flush through RPC `SaveBinlogPaths`, so that DataCoord
keep the vchannel states update. keep the vchannel states update.
@ -52,12 +52,12 @@ keep the vchannel states update.
### 1. DataNode no longer interacts with Etcd except service registering ### 1. DataNode no longer interacts with Etcd except service registering
#### DataService rather than DataNode saves binlog paths into Etcd #### DataCoord rather than DataNode saves binlog paths into Etcd
![datanode_design](graphs/datanode_design_01.jpg) ![datanode_design](graphs/datanode_design_01.jpg)
##### DataService RPC Design ##### DataCoord RPC Design
```proto ```proto
rpc SaveBinlogPaths(SaveBinlogPathsRequest) returns (common.Status){} rpc SaveBinlogPaths(SaveBinlogPathsRequest) returns (common.Status){}

View File

@ -1,12 +1,12 @@
### timetick 相关改动 ### timetick 相关改动
Datanode发送timetick msg需要有channel的信息DataService根据channel来检查segment是否可以seal和flush Datanode发送timetick msg需要有channel的信息DataCoord根据channel来检查segment是否可以seal和flush
### 服务发现 ### 服务发现
DataService启动时检查是否有新的或重启过的DataNode如果有重启过的重新注册channel并seek到上次记录的位置 DataCoord启动时检查是否有新的或重启过的DataNode如果有重启过的重新注册channel并seek到上次记录的位置
通过watch机制监听DataNode的状态如果DataNode下线其注册的channel重新分配到其他node并seek到上次的位置重新分配不一定现在做 通过watch机制监听DataNode的状态如果DataNode下线其注册的channel重新分配到其他node并seek到上次的位置重新分配不一定现在做
@ -14,14 +14,14 @@ DataService启动时检查是否有新的或重启过的DataNode如果有重
如果监听到有新的DataNode注册记录其状态后续向其注册channel或进行load balanceload balance不一定现在做 如果监听到有新的DataNode注册记录其状态后续向其注册channel或进行load balanceload balance不一定现在做
DataNode如果由于网络原因与etcd断开应该重启服务发现DataService会去重新注册channelDataNode不能重复监听相同channel DataNode如果由于网络原因与etcd断开应该重启服务发现DataCoord会去重新注册channelDataNode不能重复监听相同channel
### 需要记录的信息 ### 需要记录的信息
1. cluster信息包括Datanode的节点及其上面的channel 1. cluster信息包括Datanode的节点及其上面的channel
2. segment 分配信息最后一次分配的过期时间segment的上限等 2. segment 分配信息最后一次分配的过期时间segment的上限等
3. stats 和 segment flush channel最后位置 3. stats 和 segment flush channel最后位置
4. DataNode向DataService发送channel的最后位置 4. DataNode向DataCoord发送channel的最后位置
### 重启恢复流程 ### 重启恢复流程
@ -39,7 +39,7 @@ DataNode如果由于网络原因与etcd断开应该重启服务发现DataS
4. 启动服务发现 4. 启动服务发现
目前channel注册是segment allocation驱动的每次请求segment时检查其channel有没有被注册没有则将其注册到DataNode并保存到本地。这里有个问题如果channel注册成功DataService挂掉了那么在重启后DataService并不知道已经注册成功来一个新的请求还是会去注册而且可能注册到不同的DataNode上面。类似Transaction的情况需要有一套机制保证原子性在多节点写入目前不太好解决。可以用以下步骤临时解决 目前channel注册是segment allocation驱动的每次请求segment时检查其channel有没有被注册没有则将其注册到DataNode并保存到本地。这里有个问题如果channel注册成功DataCoord挂掉了那么在重启后DataCoord并不知道已经注册成功来一个新的请求还是会去注册而且可能注册到不同的DataNode上面。类似Transaction的情况需要有一套机制保证原子性在多节点写入目前不太好解决。可以用以下步骤临时解决
1. 在etcd上记录分配方法entry状态是未完成 1. 在etcd上记录分配方法entry状态是未完成
2. 向DataNode注册 2. 向DataNode注册
@ -51,7 +51,7 @@ DataNode如果由于网络原因与etcd断开应该重启服务发现DataS
DataService模块中有些策略是可能频繁改变的比如channel对DataNode的分配策略可以是随机/顺序/平均/根据collection分散等等策略比如检测到DataNode创建和下线可能会不处理/balance/将下线节点的channel转移到其他节点等。比如segment allocation可能会根据文件大小/条数等来确定是否关闭。实现应该把这些策略相关抽出来,方便以后修改。 DataCoord模块中有些策略是可能频繁改变的比如channel对DataNode的分配策略可以是随机/顺序/平均/根据collection分散等等策略比如检测到DataNode创建和下线可能会不处理/balance/将下线节点的channel转移到其他节点等。比如segment allocation可能会根据文件大小/条数等来确定是否关闭。实现应该把这些策略相关抽出来,方便以后修改。

View File

@ -33,7 +33,7 @@ QueryNode subscribes to insert channel, and will determine whether to use the da
When the DataNode processes each inserted entity, it updates the bloomfilter of the Segment to which the entity belongs. If it does not exist, it creates a bloomfilter in memory and updates it. When the DataNode processes each inserted entity, it updates the bloomfilter of the Segment to which the entity belongs. If it does not exist, it creates a bloomfilter in memory and updates it.
Once DataNode receives a Flush command from DataService, sorts the data in the segment in ascending order of primary key, records the maximum and minimum values of primary key, and writes the segment, statistics and bloomfilter to storage system. Once DataNode receives a Flush command from DataCoord, sorts the data in the segment in ascending order of primary key, records the maximum and minimum values of primary key, and writes the segment, statistics and bloomfilter to storage system.
- Key of binlog file: `${tenant}/insert_log/${collection_id}/${partition_id}/${segment_id}/${field_id}/_${log_idx}` - Key of binlog file: `${tenant}/insert_log/${collection_id}/${partition_id}/${segment_id}/${field_id}/_${log_idx}`
- Key of staticstics file: `${tenant}/insert_log/${collection_id}/${partition_id}/${segment_id}/${field_id}/stats_${log_idx}` - Key of staticstics file: `${tenant}/insert_log/${collection_id}/${partition_id}/${segment_id}/${field_id}/stats_${log_idx}`

View File

@ -284,8 +284,8 @@ func (ta *Allocator) CleanCache() error
type IDAllocator struct { type IDAllocator struct {
Allocator Allocator
masterAddress string rootCoordAddress string
master types.MasterService rootCoord types.RootCoord
countPerRPC uint32 countPerRPC uint32
@ -377,8 +377,8 @@ func (t *timestampOracle) ResetTimestamp()
type TimestampAllocator struct { type TimestampAllocator struct {
Allocator Allocator
masterAddress string rootCoordAddress string
masterClient types.MasterService rootCoordClient types.RootCoord
countPerRPC uint32 countPerRPC uint32
lastTsBegin Timestamp lastTsBegin Timestamp

View File

@ -11,7 +11,7 @@
#### 8.2 Index Service Interface #### 8.2 Index Service Interface
```go ```go
type IndexService interface { type IndexCoord interface {
Component Component
TimeTickProvider TimeTickProvider

View File

@ -70,7 +70,7 @@ type InvalidateCollMetaCacheRequest struct {
#### 6.1 Proxy Node Interface #### 6.1 Proxy Node Interface
```go ```go
type ProxyNode interface { type Proxy interface {
Component Component
InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error)
@ -89,7 +89,7 @@ type InvalidateCollMetaCacheRequest struct {
#### 6.2 Milvus Service Interface #### 6.2 Milvus Service Interface
ProxyNode also implements Milvus Service interface to receive client grpc call. Proxy also implements Milvus Service interface to receive client grpc call.
```go ```go
type MilvusService interface { type MilvusService interface {
@ -332,11 +332,10 @@ type Proxy struct {
stateCode internalpb.StateCode stateCode internalpb.StateCode
masterClient MasterClient rootCoordClient RootCoordClient
indexServiceClient IndexServiceClient indexCoordClient IndexCoordClient
dataServiceClient DataServiceClient dataCoordClient DataCoordClient
proxyServiceClient ProxyServiceClient queryCoordClient QueryCoordClient
queryServiceClient QueryServiceClient
sched *TaskScheduler sched *TaskScheduler
tick *timeTick tick *timeTick
@ -361,13 +360,13 @@ func (node *NodeImpl) AddStartCallback(callbacks ...func())
func (node *NodeImpl) waitForServiceReady(ctx context.Context, service Component, serviceName string) error func (node *NodeImpl) waitForServiceReady(ctx context.Context, service Component, serviceName string) error
func (node *NodeImpl) lastTick() Timestamp func (node *NodeImpl) lastTick() Timestamp
func (node *NodeImpl) AddCloseCallback(callbacks ...func()) func (node *NodeImpl) AddCloseCallback(callbacks ...func())
func (node *NodeImpl) SetMasterClient(cli MasterClient) func (node *NodeImpl) SetRootCoordClient(cli RootCoordClient)
func (node *NodeImpl) SetIndexServiceClient(cli IndexServiceClient) func (node *NodeImpl) SetIndexCoordClient(cli IndexCoordClient)
func (node *NodeImpl) SetDataServiceClient(cli DataServiceClient) func (node *NodeImpl) SetDataCoordClient(cli DataCoordClient)
func (node *NodeImpl) SetProxyServiceClient(cli ProxyServiceClient) func (node *NodeImpl) SetProxyCoordClient(cli ProxyCoordClient)
func (node *NodeImpl) SetQueryServiceClient(cli QueryServiceClient) func (node *NodeImpl) SetQueryCoordClient(cli QueryCoordClient)
func NewProxyNodeImpl(ctx context.Context, factory msgstream.Factory) (*NodeImpl, error) func NewProxyImpl(ctx context.Context, factory msgstream.Factory) (*NodeImpl, error)
``` ```
#### Global Parameter Table #### Global Parameter Table

View File

@ -454,7 +454,7 @@ type Core struct {
CallBuildIndexService func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error) CallBuildIndexService func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error)
CallDropIndexService func(ctx context.Context, indexID typeutil.UniqueID) error CallDropIndexService func(ctx context.Context, indexID typeutil.UniqueID) error
NewProxyClient func(sess *sessionutil.Session) (types.ProxyNode, error) NewProxyClient func(sess *sessionutil.Session) (types.Proxy, error)
//query service interface, notify query service to release collection //query service interface, notify query service to release collection
CallReleaseCollectionService func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error CallReleaseCollectionService func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error
@ -465,8 +465,8 @@ type Core struct {
//dml channels //dml channels
dmlChannels *dmlChannels dmlChannels *dmlChannels
//ProxyNode manager //Proxy manager
proxyNodeManager *proxyNodeManager proxyManager *proxyManager
// proxy clients // proxy clients
proxyClientManager *proxyClientManager proxyClientManager *proxyClientManager
@ -689,11 +689,11 @@ type timetickSync struct {
func newTimeTickSync(core *Core) *timetickSync func newTimeTickSync(core *Core) *timetickSync
func (t *timetickSync) UpdateTimeTick(in *internalpb.ChannelTimeTickMsg) error func (t *timetickSync) UpdateTimeTick(in *internalpb.ChannelTimeTickMsg) error
func (t *timetickSync) AddProxyNode(sess *sessionutil.Session) func (t *timetickSync) AddProxy(sess *sessionutil.Session)
func (t *timetickSync) DelProxyNode(sess *sessionutil.Session) func (t *timetickSync) DelProxy(sess *sessionutil.Session)
func (t *timetickSync) GetProxyNodes(sess []*sessionutil.Session) func (t *timetickSync) GetProxy(sess []*sessionutil.Session)
func (t *timetickSync) StartWatch() func (t *timetickSync) StartWatch()
func (t *timetickSync) SendChannelTimeTick(chanName string, ts typeutil.Timestamp) error func (t *timetickSync) SendChannelTimeTick(chanName string, ts typeutil.Timestamp) error
func (t *timetickSync) GetProxyNodeNum() func (t *timetickSync) GetProxyNum()
func (t *timetickSync) GetChanNum() int func (t *timetickSync) GetChanNum() int
``` ```

View File

@ -332,7 +332,7 @@ func (replica *SegmentReplica) getCollectionID() UniqueID {
return replica.collectionID return replica.collectionID
} }
// getCollectionSchema will get collection schema from masterservice for a certain time. // getCollectionSchema will get collection schema from rootcoord for a certain time.
// If you want the latest collection schema, ts should be 0 // If you want the latest collection schema, ts should be 0
func (replica *SegmentReplica) getCollectionSchema(collID UniqueID, ts Timestamp) (*schemapb.CollectionSchema, error) { func (replica *SegmentReplica) getCollectionSchema(collID UniqueID, ts Timestamp) (*schemapb.CollectionSchema, error) {
replica.segMu.Lock() replica.segMu.Lock()

View File

@ -63,7 +63,7 @@ func (c *Client) Init() error {
func (c *Client) connect() error { func (c *Client) connect() error {
connectGrpcFunc := func() error { connectGrpcFunc := func() error {
opts := trace.GetInterceptorOpts() opts := trace.GetInterceptorOpts()
log.Debug("ProxyNodeClient try connect ", zap.String("address", c.addr)) log.Debug("ProxyClient try connect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(c.ctx, c.addr, conn, err := grpc.DialContext(c.ctx, c.addr,
grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(5*time.Second), grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(5*time.Second),
grpc.WithUnaryInterceptor( grpc.WithUnaryInterceptor(

View File

@ -61,7 +61,7 @@ func getQueryCoordAddress(sess *sessionutil.Session) (string, error) {
return ms.Address, nil return ms.Address, nil
} }
// NewClient creates a client for QueryService grpc call. // NewClient creates a client for QueryCoord grpc call.
func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string, retryOptions ...retry.Option) (*Client, error) { func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string, retryOptions ...retry.Option) (*Client, error) {
sess := sessionutil.NewSession(ctx, metaRoot, etcdEndpoints) sess := sessionutil.NewSession(ctx, metaRoot, etcdEndpoints)
if sess == nil { if sess == nil {

View File

@ -47,9 +47,9 @@ package grpcquerycoordclient
// defer cancel() // defer cancel()
// } // }
// //
// //create queryService client // //create queryCoord client
// qs.Params.Init() // qs.Params.Init()
// log.Println("QueryService address:", qs.Params.Address) // log.Println("QueryCoord address:", qs.Params.Address)
// log.Println("Init Query service client ...") // log.Println("Init Query service client ...")
// client, err := NewClient(qs.Params.Address, 20*time.Second) // client, err := NewClient(qs.Params.Address, 20*time.Second)
// assert.Nil(t, err) // assert.Nil(t, err)
@ -124,10 +124,10 @@ package grpcquerycoordclient
// defer cancel() // defer cancel()
// } // }
// //
// //create queryService client // //create queryCoord client
// qs.Params.Init() // qs.Params.Init()
// log.Println("QueryService address:", qs.Params.Address) // log.Println("QueryCoord address:", qs.Params.Address)
// log.Println("Init Query service client ...") // log.Println("Init Query Coord client ...")
// client, err := NewClient(qs.Params.Address, 20*time.Second) // client, err := NewClient(qs.Params.Address, 20*time.Second)
// assert.Nil(t, err) // assert.Nil(t, err)
// err = client.Init() // err = client.Init()
@ -150,9 +150,9 @@ package grpcquerycoordclient
// defer cancel() // defer cancel()
// } // }
// //
// //create queryService client // //create queryCoord client
// qs.Params.Init() // qs.Params.Init()
// log.Println("QueryService address:", qs.Params.Address) // log.Println("QueryCoord address:", qs.Params.Address)
// log.Println("Init Query service client ...") // log.Println("Init Query service client ...")
// client, err := NewClient(qs.Params.Address, 20*time.Second) // client, err := NewClient(qs.Params.Address, 20*time.Second)
// assert.Nil(t, err) // assert.Nil(t, err)
@ -176,9 +176,9 @@ package grpcquerycoordclient
// defer cancel() // defer cancel()
// } // }
// //
// //create queryService client // //create queryCoord client
// qs.Params.Init() // qs.Params.Init()
// log.Println("QueryService address:", qs.Params.Address) // log.Println("QueryCoord address:", qs.Params.Address)
// log.Println("Init Query service client ...") // log.Println("Init Query service client ...")
// client, err := NewClient(qs.Params.Address, 20*time.Second) // client, err := NewClient(qs.Params.Address, 20*time.Second)
// assert.Nil(t, err) // assert.Nil(t, err)

View File

@ -141,7 +141,7 @@ func (s *Server) init() error {
dataCoord, err := dsc.NewClient(s.loopCtx, qc.Params.MetaRootPath, qc.Params.EtcdEndpoints, retry.Attempts(300)) dataCoord, err := dsc.NewClient(s.loopCtx, qc.Params.MetaRootPath, qc.Params.EtcdEndpoints, retry.Attempts(300))
if err != nil { if err != nil {
log.Debug("QueryService try to new DataCoord client failed", zap.Error(err)) log.Debug("QueryCoord try to new DataCoord client failed", zap.Error(err))
panic(err) panic(err)
} }
if err = dataCoord.Init(); err != nil { if err = dataCoord.Init(); err != nil {

View File

@ -244,7 +244,7 @@ func (s *Server) startGrpcLoop(grpcPort int) {
} }
func (s *Server) start() error { func (s *Server) start() error {
log.Debug("Master Core start ...") log.Debug("RootCoord Core start ...")
if err := s.rootCoord.Start(); err != nil { if err := s.rootCoord.Start(); err != nil {
return err return err
} }

View File

@ -111,7 +111,7 @@ func (node *Proxy) Init() error {
log.Debug("Proxy dataCoord is ready") log.Debug("Proxy dataCoord is ready")
} }
// wait for queryService state changed to Healthy // wait for queryCoord state changed to Healthy
if node.queryCoord != nil { if node.queryCoord != nil {
log.Debug("Proxy wait for queryCoord ready") log.Debug("Proxy wait for queryCoord ready")
err := funcutil.WaitForComponentHealthy(node.ctx, node.queryCoord, "QueryCoord", 1000000, time.Millisecond*200) err := funcutil.WaitForComponentHealthy(node.ctx, node.queryCoord, "QueryCoord", 1000000, time.Millisecond*200)

View File

@ -160,7 +160,7 @@ func (node *QueryNode) Init() error {
// }, // },
//} //}
// //
//resp, err := node.queryService.RegisterNode(ctx, registerReq) //resp, err := node.queryCoord.RegisterNode(ctx, registerReq)
//if err != nil { //if err != nil {
// log.Debug("QueryNode RegisterNode failed", zap.Error(err)) // log.Debug("QueryNode RegisterNode failed", zap.Error(err))
// panic(err) // panic(err)

View File

@ -111,7 +111,7 @@ func (mt *metaTable) reloadFromKV() error {
tenantMeta := pb.TenantMeta{} tenantMeta := pb.TenantMeta{}
err := proto.UnmarshalText(value, &tenantMeta) err := proto.UnmarshalText(value, &tenantMeta)
if err != nil { if err != nil {
return fmt.Errorf("MasterService UnmarshalText pb.TenantMeta err:%w", err) return fmt.Errorf("RootCoord UnmarshalText pb.TenantMeta err:%w", err)
} }
mt.tenantID2Meta[tenantMeta.ID] = tenantMeta mt.tenantID2Meta[tenantMeta.ID] = tenantMeta
} }
@ -125,7 +125,7 @@ func (mt *metaTable) reloadFromKV() error {
proxyMeta := pb.ProxyMeta{} proxyMeta := pb.ProxyMeta{}
err = proto.UnmarshalText(value, &proxyMeta) err = proto.UnmarshalText(value, &proxyMeta)
if err != nil { if err != nil {
return fmt.Errorf("MasterService UnmarshalText pb.ProxyMeta err:%w", err) return fmt.Errorf("RootCoord UnmarshalText pb.ProxyMeta err:%w", err)
} }
mt.proxyID2Meta[proxyMeta.ID] = proxyMeta mt.proxyID2Meta[proxyMeta.ID] = proxyMeta
} }
@ -139,7 +139,7 @@ func (mt *metaTable) reloadFromKV() error {
collInfo := pb.CollectionInfo{} collInfo := pb.CollectionInfo{}
err = proto.UnmarshalText(value, &collInfo) err = proto.UnmarshalText(value, &collInfo)
if err != nil { if err != nil {
return fmt.Errorf("MasterService UnmarshalText pb.CollectionInfo err:%w", err) return fmt.Errorf("RootCoord UnmarshalText pb.CollectionInfo err:%w", err)
} }
mt.collID2Meta[collInfo.ID] = collInfo mt.collID2Meta[collInfo.ID] = collInfo
mt.collName2ID[collInfo.Schema.Name] = collInfo.ID mt.collName2ID[collInfo.Schema.Name] = collInfo.ID
@ -156,7 +156,7 @@ func (mt *metaTable) reloadFromKV() error {
partitionInfo := pb.PartitionInfo{} partitionInfo := pb.PartitionInfo{}
err = proto.UnmarshalText(value, &partitionInfo) err = proto.UnmarshalText(value, &partitionInfo)
if err != nil { if err != nil {
return fmt.Errorf("MasterService UnmarshalText pb.PartitionInfo err:%w", err) return fmt.Errorf("RootCoord UnmarshalText pb.PartitionInfo err:%w", err)
} }
collID, ok := mt.partitionID2CollID[partitionInfo.PartitionID] collID, ok := mt.partitionID2CollID[partitionInfo.PartitionID]
if !ok { if !ok {
@ -179,7 +179,7 @@ func (mt *metaTable) reloadFromKV() error {
segmentIndexInfo := pb.SegmentIndexInfo{} segmentIndexInfo := pb.SegmentIndexInfo{}
err = proto.UnmarshalText(value, &segmentIndexInfo) err = proto.UnmarshalText(value, &segmentIndexInfo)
if err != nil { if err != nil {
return fmt.Errorf("MasterService UnmarshalText pb.SegmentIndexInfo err:%w", err) return fmt.Errorf("RootCoord UnmarshalText pb.SegmentIndexInfo err:%w", err)
} }
idx, ok := mt.segID2IndexMeta[segmentIndexInfo.SegmentID] idx, ok := mt.segID2IndexMeta[segmentIndexInfo.SegmentID]
if ok { if ok {
@ -199,7 +199,7 @@ func (mt *metaTable) reloadFromKV() error {
meta := pb.IndexInfo{} meta := pb.IndexInfo{}
err = proto.UnmarshalText(value, &meta) err = proto.UnmarshalText(value, &meta)
if err != nil { if err != nil {
return fmt.Errorf("MasterService UnmarshalText pb.IndexInfo err:%w", err) return fmt.Errorf("RootCoord UnmarshalText pb.IndexInfo err:%w", err)
} }
mt.indexID2Meta[meta.IndexID] = meta mt.indexID2Meta[meta.IndexID] = meta
} }

View File

@ -73,7 +73,7 @@ func Test_MockKV(t *testing.T) {
prefix[TenantMetaPrefix] = []string{"tenant-prefix"} prefix[TenantMetaPrefix] = []string{"tenant-prefix"}
_, err = NewMetaTable(k1) _, err = NewMetaTable(k1)
assert.NotNil(t, err) assert.NotNil(t, err)
assert.EqualError(t, err, "MasterService UnmarshalText pb.TenantMeta err:line 1.0: unknown field name \"tenant-prefix\" in milvus.proto.etcd.TenantMeta") assert.EqualError(t, err, "RootCoord UnmarshalText pb.TenantMeta err:line 1.0: unknown field name \"tenant-prefix\" in milvus.proto.etcd.TenantMeta")
prefix[TenantMetaPrefix] = []string{proto.MarshalTextString(&pb.TenantMeta{})} prefix[TenantMetaPrefix] = []string{proto.MarshalTextString(&pb.TenantMeta{})}
_, err = NewMetaTable(k1) _, err = NewMetaTable(k1)
@ -82,7 +82,7 @@ func Test_MockKV(t *testing.T) {
prefix[ProxyMetaPrefix] = []string{"porxy-meta"} prefix[ProxyMetaPrefix] = []string{"porxy-meta"}
_, err = NewMetaTable(k1) _, err = NewMetaTable(k1)
assert.NotNil(t, err) assert.NotNil(t, err)
assert.EqualError(t, err, "MasterService UnmarshalText pb.ProxyMeta err:line 1.0: unknown field name \"porxy-meta\" in milvus.proto.etcd.ProxyMeta") assert.EqualError(t, err, "RootCoord UnmarshalText pb.ProxyMeta err:line 1.0: unknown field name \"porxy-meta\" in milvus.proto.etcd.ProxyMeta")
prefix[ProxyMetaPrefix] = []string{proto.MarshalTextString(&pb.ProxyMeta{})} prefix[ProxyMetaPrefix] = []string{proto.MarshalTextString(&pb.ProxyMeta{})}
_, err = NewMetaTable(k1) _, err = NewMetaTable(k1)
@ -91,7 +91,7 @@ func Test_MockKV(t *testing.T) {
prefix[CollectionMetaPrefix] = []string{"collection-meta"} prefix[CollectionMetaPrefix] = []string{"collection-meta"}
_, err = NewMetaTable(k1) _, err = NewMetaTable(k1)
assert.NotNil(t, err) assert.NotNil(t, err)
assert.EqualError(t, err, "MasterService UnmarshalText pb.CollectionInfo err:line 1.0: unknown field name \"collection-meta\" in milvus.proto.etcd.CollectionInfo") assert.EqualError(t, err, "RootCoord UnmarshalText pb.CollectionInfo err:line 1.0: unknown field name \"collection-meta\" in milvus.proto.etcd.CollectionInfo")
prefix[CollectionMetaPrefix] = []string{proto.MarshalTextString(&pb.CollectionInfo{Schema: &schemapb.CollectionSchema{}})} prefix[CollectionMetaPrefix] = []string{proto.MarshalTextString(&pb.CollectionInfo{Schema: &schemapb.CollectionSchema{}})}
_, err = NewMetaTable(k1) _, err = NewMetaTable(k1)
@ -100,7 +100,7 @@ func Test_MockKV(t *testing.T) {
prefix[PartitionMetaPrefix] = []string{"partition-meta"} prefix[PartitionMetaPrefix] = []string{"partition-meta"}
_, err = NewMetaTable(k1) _, err = NewMetaTable(k1)
assert.NotNil(t, err) assert.NotNil(t, err)
assert.EqualError(t, err, "MasterService UnmarshalText pb.PartitionInfo err:line 1.0: unknown field name \"partition-meta\" in milvus.proto.etcd.PartitionInfo") assert.EqualError(t, err, "RootCoord UnmarshalText pb.PartitionInfo err:line 1.0: unknown field name \"partition-meta\" in milvus.proto.etcd.PartitionInfo")
prefix[PartitionMetaPrefix] = []string{proto.MarshalTextString(&pb.PartitionInfo{})} prefix[PartitionMetaPrefix] = []string{proto.MarshalTextString(&pb.PartitionInfo{})}
_, err = NewMetaTable(k1) _, err = NewMetaTable(k1)
@ -109,7 +109,7 @@ func Test_MockKV(t *testing.T) {
prefix[SegmentIndexMetaPrefix] = []string{"segment-index-meta"} prefix[SegmentIndexMetaPrefix] = []string{"segment-index-meta"}
_, err = NewMetaTable(k1) _, err = NewMetaTable(k1)
assert.NotNil(t, err) assert.NotNil(t, err)
assert.EqualError(t, err, "MasterService UnmarshalText pb.SegmentIndexInfo err:line 1.0: unknown field name \"segment-index-meta\" in milvus.proto.etcd.SegmentIndexInfo") assert.EqualError(t, err, "RootCoord UnmarshalText pb.SegmentIndexInfo err:line 1.0: unknown field name \"segment-index-meta\" in milvus.proto.etcd.SegmentIndexInfo")
prefix[SegmentIndexMetaPrefix] = []string{proto.MarshalTextString(&pb.SegmentIndexInfo{})} prefix[SegmentIndexMetaPrefix] = []string{proto.MarshalTextString(&pb.SegmentIndexInfo{})}
_, err = NewMetaTable(k1) _, err = NewMetaTable(k1)
@ -123,7 +123,7 @@ func Test_MockKV(t *testing.T) {
prefix[IndexMetaPrefix] = []string{"index-meta"} prefix[IndexMetaPrefix] = []string{"index-meta"}
_, err = NewMetaTable(k1) _, err = NewMetaTable(k1)
assert.NotNil(t, err) assert.NotNil(t, err)
assert.EqualError(t, err, "MasterService UnmarshalText pb.IndexInfo err:line 1.0: unknown field name \"index-meta\" in milvus.proto.etcd.IndexInfo") assert.EqualError(t, err, "RootCoord UnmarshalText pb.IndexInfo err:line 1.0: unknown field name \"index-meta\" in milvus.proto.etcd.IndexInfo")
prefix[IndexMetaPrefix] = []string{proto.MarshalTextString(&pb.IndexInfo{})} prefix[IndexMetaPrefix] = []string{proto.MarshalTextString(&pb.IndexInfo{})}
m1, err := NewMetaTable(k1) m1, err := NewMetaTable(k1)

View File

@ -124,7 +124,7 @@ func (p *ParamTable) initKvRootPath() {
} }
func (p *ParamTable) initMsgChannelSubName() { func (p *ParamTable) initMsgChannelSubName() {
name, err := p.Load("msgChannel.subNamePrefix.masterSubNamePrefix") name, err := p.Load("msgChannel.subNamePrefix.rootCoordSubNamePrefix")
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -132,7 +132,7 @@ func (p *ParamTable) initMsgChannelSubName() {
} }
func (p *ParamTable) initTimeTickChannel() { func (p *ParamTable) initTimeTickChannel() {
channel, err := p.Load("msgChannel.chanNamePrefix.masterTimeTick") channel, err := p.Load("msgChannel.chanNamePrefix.rootCoordTimeTick")
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -140,7 +140,7 @@ func (p *ParamTable) initTimeTickChannel() {
} }
func (p *ParamTable) initStatisticsChannelName() { func (p *ParamTable) initStatisticsChannelName() {
channel, err := p.Load("msgChannel.chanNamePrefix.masterStatistics") channel, err := p.Load("msgChannel.chanNamePrefix.rootCoordStatistics")
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -156,11 +156,11 @@ func (p *ParamTable) initSegmentInfoChannelName() {
} }
func (p *ParamTable) initMaxPartitionNum() { func (p *ParamTable) initMaxPartitionNum() {
p.MaxPartitionNum = p.ParseInt64("master.maxPartitionNum") p.MaxPartitionNum = p.ParseInt64("rootcoord.maxPartitionNum")
} }
func (p *ParamTable) initMinSegmentSizeToEnableIndex() { func (p *ParamTable) initMinSegmentSizeToEnableIndex() {
p.MinSegmentSizeToEnableIndex = p.ParseInt64("master.minSegmentSizeToEnableIndex") p.MinSegmentSizeToEnableIndex = p.ParseInt64("rootcoord.minSegmentSizeToEnableIndex")
} }
func (p *ParamTable) initDefaultPartitionName() { func (p *ParamTable) initDefaultPartitionName() {
@ -180,11 +180,11 @@ func (p *ParamTable) initDefaultIndexName() {
} }
func (p *ParamTable) initTimeout() { func (p *ParamTable) initTimeout() {
p.Timeout = p.ParseInt("master.timeout") p.Timeout = p.ParseInt("rootcoord.timeout")
} }
func (p *ParamTable) initTimeTickInterval() { func (p *ParamTable) initTimeTickInterval() {
p.TimeTickInterval = p.ParseInt("master.timeTickInterval") p.TimeTickInterval = p.ParseInt("rootcoord.timeTickInterval")
} }
func (p *ParamTable) initLogCfg() { func (p *ParamTable) initLogCfg() {
@ -207,7 +207,7 @@ func (p *ParamTable) initLogCfg() {
panic(err) panic(err)
} }
if len(rootPath) != 0 { if len(rootPath) != 0 {
p.Log.File.Filename = path.Join(rootPath, "masterservice.log") p.Log.File.Filename = path.Join(rootPath, "rootcoord.log")
} else { } else {
p.Log.File.Filename = "" p.Log.File.Filename = ""
} }

View File

@ -260,7 +260,7 @@ func getNotTtMsg(ctx context.Context, n int, ch <-chan *msgstream.MsgPack) []msg
} }
} }
func TestMasterService(t *testing.T) { func TestRootCoord(t *testing.T) {
const ( const (
dbName = "testDb" dbName = "testDb"
collName = "testColl" collName = "testColl"
@ -1740,7 +1740,7 @@ func TestMasterService(t *testing.T) {
}) })
} }
func TestMasterService2(t *testing.T) { func TestRootCoord2(t *testing.T) {
const ( const (
dbName = "testDb" dbName = "testDb"
collName = "testColl" collName = "testColl"

View File

@ -26,8 +26,8 @@ type InterceptorSuite struct {
var ( var (
filterFunc = func(ctx context.Context, fullMethodName string) bool { filterFunc = func(ctx context.Context, fullMethodName string) bool {
if fullMethodName == `/milvus.proto.master.MasterService/UpdateChannelTimeTick` || if fullMethodName == `/milvus.proto.root_coord.RootCoord/UpdateChannelTimeTick` ||
fullMethodName == `/milvus.proto.master.MasterService/AllocTimestamp` { fullMethodName == `/milvus.proto.root_coord.RootCoord/AllocTimestamp` {
return false return false
} }
return true return true

View File

@ -1,22 +1,22 @@
cd ../build/docker/deploy/ cd ../build/docker/deploy/
echo "starting master docker" echo "starting rootcoord docker"
nohup docker-compose -p milvus up master > ~/master_docker.log 2>&1 & nohup docker-compose -p milvus up rootcoord > ~/rootcoord_docker.log 2>&1 &
echo "starting proxynode docker" echo "starting proxy docker"
nohup docker-compose -p milvus up proxynode > ~/proxynode_docker.log 2>&1 & nohup docker-compose -p milvus up proxy > ~/proxy_docker.log 2>&1 &
echo "starting indexservice docker" echo "starting indexcoord docker"
nohup docker-compose -p milvus up indexservice > ~/indexservice_docker.log 2>&1 & nohup docker-compose -p milvus up indexcoord > ~/indexcoord_docker.log 2>&1 &
echo "starting indexnode docker" echo "starting indexnode docker"
nohup docker-compose -p milvus up indexnode > ~/indexnode_docker.log 2>&1 & nohup docker-compose -p milvus up indexnode > ~/indexnode_docker.log 2>&1 &
echo "starting queryservice docker" echo "starting querycoord docker"
nohup docker-compose -p milvus up querycoord > ~/queryservice_docker.log 2>&1 & nohup docker-compose -p milvus up querycoord > ~/querycoord_docker.log 2>&1 &
echo "starting dataservice docker" echo "starting datacoord docker"
nohup docker-compose -p milvus up datacoord > ~/dataservice_docker.log 2>&1 & nohup docker-compose -p milvus up datacoord > ~/datacoord_docker.log 2>&1 &
echo "starting querynode1 docker" echo "starting querynode1 docker"
nohup docker-compose -p milvus run -e QUERY_NODE_ID=1 querynode > ~/querynode1_docker.log 2>&1 & nohup docker-compose -p milvus run -e QUERY_NODE_ID=1 querynode > ~/querynode1_docker.log 2>&1 &