diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index 10e2efdb79..d85809689c 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -178,7 +178,7 @@ func (i *IndexCoord) Init() error { return } log.Debug("IndexCoord try to connect etcd success") - i.nodeManager = NewNodeManager() + i.nodeManager = NewNodeManager(i.loopCtx) sessions, revision, err := i.session.GetSessions(typeutil.IndexNodeRole) log.Debug("IndexCoord", zap.Int("session number", len(sessions)), zap.Int64("revision", revision)) diff --git a/internal/indexcoord/node_manager.go b/internal/indexcoord/node_manager.go index 96933873d5..f3df1df544 100644 --- a/internal/indexcoord/node_manager.go +++ b/internal/indexcoord/node_manager.go @@ -19,10 +19,10 @@ package indexcoord import ( "context" "sync" - - "github.com/milvus-io/milvus/internal/util/metricsinfo" + "time" "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/milvus-io/milvus/internal/util/metricsinfo" grpcindexnodeclient "github.com/milvus-io/milvus/internal/distributed/indexnode/client" "github.com/milvus-io/milvus/internal/log" @@ -34,26 +34,24 @@ import ( type NodeManager struct { nodeClients map[UniqueID]types.IndexNode pq *PriorityQueue - - lock sync.RWMutex + lock sync.RWMutex + ctx context.Context } // NewNodeManager is used to create a new NodeManager. -func NewNodeManager() *NodeManager { +func NewNodeManager(ctx context.Context) *NodeManager { return &NodeManager{ nodeClients: make(map[UniqueID]types.IndexNode), pq: &PriorityQueue{ policy: PeekClientV1, }, lock: sync.RWMutex{}, + ctx: ctx, } } // setClient sets IndexNode client to node manager. func (nm *NodeManager) setClient(nodeID UniqueID, client types.IndexNode) error { - nm.lock.Lock() - defer nm.lock.Unlock() - log.Debug("IndexCoord NodeManager setClient", zap.Int64("nodeID", nodeID)) defer log.Debug("IndexNode NodeManager setclient success", zap.Any("nodeID", nodeID)) item := &PQItem{ @@ -62,18 +60,19 @@ func (nm *NodeManager) setClient(nodeID UniqueID, client types.IndexNode) error weight: 0, totalMem: 0, } + nm.lock.Lock() nm.nodeClients[nodeID] = client + nm.lock.Unlock() nm.pq.Push(item) return nil } // RemoveNode removes the unused client of IndexNode. func (nm *NodeManager) RemoveNode(nodeID UniqueID) { - nm.lock.Lock() - defer nm.lock.Unlock() - log.Debug("IndexCoord", zap.Any("Remove node with ID", nodeID)) + nm.lock.Lock() delete(nm.nodeClients, nodeID) + nm.lock.Unlock() nm.pq.Remove(nodeID) } @@ -100,9 +99,6 @@ func (nm *NodeManager) AddNode(nodeID UniqueID, address string) error { // PeekClient peeks the client with the least load. func (nm *NodeManager) PeekClient(meta Meta) (UniqueID, types.IndexNode) { - nm.lock.Lock() - defer nm.lock.Unlock() - log.Debug("IndexCoord NodeManager PeekClient") dim, err := getDimension(meta.indexMeta.Req) @@ -120,6 +116,8 @@ func (nm *NodeManager) PeekClient(meta Meta) (UniqueID, types.IndexNode) { log.Error("No IndexNode available", zap.Uint64("data size", dataSize), zap.Uint64("IndexNode must have memory size", dataSize*indexSizeFactor)) } + nm.lock.Lock() + defer nm.lock.Unlock() client, ok := nm.nodeClients[nodeID] if !ok { log.Error("IndexCoord NodeManager PeekClient", zap.Int64("There is no IndexNode client corresponding to NodeID", nodeID)) @@ -131,34 +129,57 @@ func (nm *NodeManager) PeekClient(meta Meta) (UniqueID, types.IndexNode) { // ListNode list all IndexNodes in node manager. func (nm *NodeManager) ListNode() []UniqueID { - nm.lock.Lock() - defer nm.lock.Unlock() - clients := []UniqueID{} + //nm.lock.Lock() + //defer nm.lock.Unlock() + var clientIDs []UniqueID + nm.lock.RLock() for id := range nm.nodeClients { - clients = append(clients, id) - if item := (nm.pq.getItemByKey(id)).(*PQItem); item.totalMem == 0 { + clientIDs = append(clientIDs, id) + } + + nm.lock.RUnlock() + var wg sync.WaitGroup + + for _, id := range clientIDs { + memory := nm.pq.GetMemory(id) + if memory == 0 { req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics) if err != nil { log.Error("create metrics request failed", zap.Error(err)) continue } - metrics, err := nm.nodeClients[id].GetMetrics(context.Background(), req) - if err != nil { - log.Error("get indexnode metrics failed", zap.Error(err)) - continue - } - infos := &metricsinfo.IndexNodeInfos{} - err = metricsinfo.UnmarshalComponentInfos(metrics.Response, infos) - if err != nil { - log.Error("get indexnode metrics info failed", zap.Error(err)) + nm.lock.RLock() + client, ok := nm.nodeClients[id] + if !ok { + nm.lock.RUnlock() + log.Debug("NodeManager ListNode find client not exist") continue } - nm.pq.SetMemory(id, infos.HardwareInfos.Memory) + nm.lock.RUnlock() + + wg.Add(1) + go func(group *sync.WaitGroup, id UniqueID) { + defer group.Done() + ctx, cancel := context.WithTimeout(nm.ctx, time.Second*5) + defer cancel() + metrics, err := client.GetMetrics(ctx, req) + if err != nil { + log.Error("get IndexNode metrics failed", zap.Error(err)) + return + } + infos := &metricsinfo.IndexNodeInfos{} + err = metricsinfo.UnmarshalComponentInfos(metrics.Response, infos) + if err != nil { + log.Error("get IndexNode metrics info failed", zap.Error(err)) + return + } + nm.pq.SetMemory(id, infos.HardwareInfos.Memory) + }(&wg, id) } - } - return clients + wg.Wait() + return clientIDs } // indexNodeGetMetricsResponse record the metrics information of IndexNode. @@ -169,17 +190,20 @@ type indexNodeGetMetricsResponse struct { // getMetrics get metrics information of all IndexNode. func (nm *NodeManager) getMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) []indexNodeGetMetricsResponse { + var clients []types.IndexNode nm.lock.RLock() - defer nm.lock.RUnlock() + for _, node := range nm.nodeClients { + clients = append(clients, node) + } + nm.lock.RUnlock() ret := make([]indexNodeGetMetricsResponse, 0, len(nm.nodeClients)) - for _, node := range nm.nodeClients { + for _, node := range clients { resp, err := node.GetMetrics(ctx, req) ret = append(ret, indexNodeGetMetricsResponse{ resp: resp, err: err, }) } - return ret } diff --git a/internal/indexcoord/priority_queue.go b/internal/indexcoord/priority_queue.go index 4b06be03ab..6203b6e804 100644 --- a/internal/indexcoord/priority_queue.go +++ b/internal/indexcoord/priority_queue.go @@ -167,6 +167,19 @@ func (pq *PriorityQueue) PeekAll() []UniqueID { return ret } +// GetMemory get the memory info for the speicied key. +func (pq *PriorityQueue) GetMemory(key UniqueID) uint64 { + pq.lock.RLock() + defer pq.lock.RUnlock() + + for i := range pq.items { + if pq.items[i].key == key { + return pq.items[i].totalMem + } + } + return 0 +} + // SetMemory sets the memory info for IndexNode. func (pq *PriorityQueue) SetMemory(key UniqueID, memorySize uint64) { pq.lock.Lock()