diff --git a/internal/querycoord/channel_allocator.go b/internal/querycoord/channel_allocator.go index e29cdcea95..f5e91d58d0 100644 --- a/internal/querycoord/channel_allocator.go +++ b/internal/querycoord/channel_allocator.go @@ -42,7 +42,7 @@ func shuffleChannelsToQueryNode(ctx context.Context, reqs []*querypb.WatchDmChan var onlineNodeIDs []int64 for { if replicaID == -1 { - onlineNodeIDs = cluster.onlineNodeIDs() + onlineNodeIDs = cluster.OnlineNodeIDs() } else { replica, err := metaCache.getReplicaByID(replicaID) if err != nil { @@ -50,7 +50,7 @@ func shuffleChannelsToQueryNode(ctx context.Context, reqs []*querypb.WatchDmChan } replicaNodes := replica.GetNodeIds() for _, nodeID := range replicaNodes { - if ok, err := cluster.isOnline(nodeID); err == nil && ok { + if ok, err := cluster.IsOnline(nodeID); err == nil && ok { onlineNodeIDs = append(onlineNodeIDs, nodeID) } } diff --git a/internal/querycoord/channel_allocator_test.go b/internal/querycoord/channel_allocator_test.go index 9477c5f742..eb376821f5 100644 --- a/internal/querycoord/channel_allocator_test.go +++ b/internal/querycoord/channel_allocator_test.go @@ -50,7 +50,7 @@ func TestShuffleChannelsToQueryNode(t *testing.T) { } meta, err := newMeta(baseCtx, kv, nil, idAllocator) assert.Nil(t, err) - cluster := &queryNodeCluster{ + var cluster Cluster = &queryNodeCluster{ ctx: baseCtx, cancel: cancel, client: kv, @@ -87,7 +87,7 @@ func TestShuffleChannelsToQueryNode(t *testing.T) { assert.Nil(t, err) nodeSession := node.session nodeID := node.queryNodeID - cluster.registerNode(baseCtx, nodeSession, nodeID, disConnect) + cluster.RegisterNode(baseCtx, nodeSession, nodeID, disConnect) waitQueryNodeOnline(cluster, nodeID) err = shuffleChannelsToQueryNode(baseCtx, reqs, cluster, meta, false, nil, nil, -1) diff --git a/internal/querycoord/cluster.go b/internal/querycoord/cluster.go index f1bf2b2773..6205dfd3d1 100644 --- a/internal/querycoord/cluster.go +++ b/internal/querycoord/cluster.go @@ -46,40 +46,43 @@ const ( // Cluster manages all query node connections and grpc requests type Cluster interface { + // Collection/Parition + ReleaseCollection(ctx context.Context, nodeID int64, in *querypb.ReleaseCollectionRequest) error + ReleasePartitions(ctx context.Context, nodeID int64, in *querypb.ReleasePartitionsRequest) error + + // Segment + LoadSegments(ctx context.Context, nodeID int64, in *querypb.LoadSegmentsRequest) error + ReleaseSegments(ctx context.Context, nodeID int64, in *querypb.ReleaseSegmentsRequest) error + GetSegmentInfo(ctx context.Context, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error) + GetSegmentInfoByNode(ctx context.Context, nodeID int64, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error) + GetSegmentInfoByID(ctx context.Context, segmentID UniqueID) (*querypb.SegmentInfo, error) + SyncReplicaSegments(ctx context.Context, leaderID UniqueID, in *querypb.SyncReplicaSegmentsRequest) error + + // Channel + WatchDmChannels(ctx context.Context, nodeID int64, in *querypb.WatchDmChannelsRequest) error + WatchDeltaChannels(ctx context.Context, nodeID int64, in *querypb.WatchDeltaChannelsRequest) error + HasWatchedDeltaChannel(ctx context.Context, nodeID int64, collectionID UniqueID) bool + + // Node + RegisterNode(ctx context.Context, session *sessionutil.Session, id UniqueID, state nodeState) error + GetNodeInfoByID(nodeID int64) (Node, error) + RemoveNodeInfo(nodeID int64) error + StopNode(nodeID int64) + OnlineNodeIDs() []int64 + IsOnline(nodeID int64) (bool, error) + OfflineNodeIDs() []int64 + HasNode(nodeID int64) bool + GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) []queryNodeGetMetricsResponse + + AllocateSegmentsToQueryNode(ctx context.Context, reqs []*querypb.LoadSegmentsRequest, wait bool, excludeNodeIDs []int64, includeNodeIDs []int64, replicaID int64) error + AllocateChannelsToQueryNode(ctx context.Context, reqs []*querypb.WatchDmChannelsRequest, wait bool, excludeNodeIDs []int64, includeNodeIDs []int64, replicaID int64) error + AssignNodesToReplicas(ctx context.Context, replicas []*milvuspb.ReplicaInfo, collectionSize uint64) error + + GetSessionVersion() int64 + + // Inner reloadFromKV() error getComponentInfos(ctx context.Context) []*internalpb.ComponentInfo - - loadSegments(ctx context.Context, nodeID int64, in *querypb.LoadSegmentsRequest) error - releaseSegments(ctx context.Context, nodeID int64, in *querypb.ReleaseSegmentsRequest) error - - watchDmChannels(ctx context.Context, nodeID int64, in *querypb.WatchDmChannelsRequest) error - watchDeltaChannels(ctx context.Context, nodeID int64, in *querypb.WatchDeltaChannelsRequest) error - - hasWatchedDeltaChannel(ctx context.Context, nodeID int64, collectionID UniqueID) bool - releaseCollection(ctx context.Context, nodeID int64, in *querypb.ReleaseCollectionRequest) error - releasePartitions(ctx context.Context, nodeID int64, in *querypb.ReleasePartitionsRequest) error - getSegmentInfo(ctx context.Context, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error) - getSegmentInfoByNode(ctx context.Context, nodeID int64, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error) - getSegmentInfoByID(ctx context.Context, segmentID UniqueID) (*querypb.SegmentInfo, error) - syncReplicaSegments(ctx context.Context, leaderID UniqueID, in *querypb.SyncReplicaSegmentsRequest) error - - registerNode(ctx context.Context, session *sessionutil.Session, id UniqueID, state nodeState) error - getNodeInfoByID(nodeID int64) (Node, error) - removeNodeInfo(nodeID int64) error - stopNode(nodeID int64) - onlineNodeIDs() []int64 - isOnline(nodeID int64) (bool, error) - offlineNodeIDs() []int64 - hasNode(nodeID int64) bool - - allocateSegmentsToQueryNode(ctx context.Context, reqs []*querypb.LoadSegmentsRequest, wait bool, excludeNodeIDs []int64, includeNodeIDs []int64, replicaID int64) error - allocateChannelsToQueryNode(ctx context.Context, reqs []*querypb.WatchDmChannelsRequest, wait bool, excludeNodeIDs []int64, includeNodeIDs []int64, replicaID int64) error - - assignNodesToReplicas(ctx context.Context, replicas []*milvuspb.ReplicaInfo, collectionSize uint64) error - - getSessionVersion() int64 - - getMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) []queryNodeGetMetricsResponse } type newQueryNodeFn func(ctx context.Context, address string, id UniqueID, kv *etcdkv.EtcdKV) (Node, error) @@ -135,7 +138,6 @@ func newQueryNodeCluster(ctx context.Context, clusterMeta Meta, kv *etcdkv.EtcdK // Reload trigger task, trigger task states, internal task, internal task state from etcd // Assign the internal task to the corresponding trigger task as a child task func (c *queryNodeCluster) reloadFromKV() error { - toLoadMetaNodeIDs := make([]int64, 0) // get current online session onlineNodeSessions, version, _ := c.session.GetSessions(typeutil.QueryNodeRole) onlineSessionMap := make(map[int64]*sessionutil.Session) @@ -145,12 +147,11 @@ func (c *queryNodeCluster) reloadFromKV() error { } for nodeID, session := range onlineSessionMap { log.Info("reloadFromKV: register a queryNode to cluster", zap.Any("nodeID", nodeID)) - err := c.registerNode(c.ctx, session, nodeID, disConnect) + err := c.RegisterNode(c.ctx, session, nodeID, disConnect) if err != nil { log.Warn("QueryNode failed to register", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error())) return err } - toLoadMetaNodeIDs = append(toLoadMetaNodeIDs, nodeID) } c.sessionVersion = version @@ -173,19 +174,18 @@ func (c *queryNodeCluster) reloadFromKV() error { log.Warn("watchNodeLoop: unmarshal session error", zap.Error(err)) return err } - err = c.registerNode(context.Background(), session, nodeID, offline) + err = c.RegisterNode(context.Background(), session, nodeID, offline) if err != nil { log.Warn("reloadFromKV: failed to add queryNode to cluster", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error())) return err } - toLoadMetaNodeIDs = append(toLoadMetaNodeIDs, nodeID) } } return nil } -func (c *queryNodeCluster) getSessionVersion() int64 { +func (c *queryNodeCluster) GetSessionVersion() int64 { return c.sessionVersion } @@ -201,7 +201,7 @@ func (c *queryNodeCluster) getComponentInfos(ctx context.Context) []*internalpb. return subComponentInfos } -func (c *queryNodeCluster) loadSegments(ctx context.Context, nodeID int64, in *querypb.LoadSegmentsRequest) error { +func (c *queryNodeCluster) LoadSegments(ctx context.Context, nodeID int64, in *querypb.LoadSegmentsRequest) error { c.RLock() var targetNode Node if node, ok := c.nodes[nodeID]; ok { @@ -221,7 +221,7 @@ func (c *queryNodeCluster) loadSegments(ctx context.Context, nodeID int64, in *q return fmt.Errorf("loadSegments: can't find QueryNode by nodeID, nodeID = %d", nodeID) } -func (c *queryNodeCluster) releaseSegments(ctx context.Context, leaderID int64, in *querypb.ReleaseSegmentsRequest) error { +func (c *queryNodeCluster) ReleaseSegments(ctx context.Context, leaderID int64, in *querypb.ReleaseSegmentsRequest) error { c.RLock() var targetNode Node if node, ok := c.nodes[leaderID]; ok { @@ -246,7 +246,7 @@ func (c *queryNodeCluster) releaseSegments(ctx context.Context, leaderID int64, return fmt.Errorf("releaseSegments: can't find QueryNode by nodeID, nodeID = %d", leaderID) } -func (c *queryNodeCluster) watchDmChannels(ctx context.Context, nodeID int64, in *querypb.WatchDmChannelsRequest) error { +func (c *queryNodeCluster) WatchDmChannels(ctx context.Context, nodeID int64, in *querypb.WatchDmChannelsRequest) error { c.RLock() var targetNode Node if node, ok := c.nodes[nodeID]; ok { @@ -281,7 +281,7 @@ func (c *queryNodeCluster) watchDmChannels(ctx context.Context, nodeID int64, in return fmt.Errorf("watchDmChannels: can't find QueryNode by nodeID, nodeID = %d", nodeID) } -func (c *queryNodeCluster) watchDeltaChannels(ctx context.Context, nodeID int64, in *querypb.WatchDeltaChannelsRequest) error { +func (c *queryNodeCluster) WatchDeltaChannels(ctx context.Context, nodeID int64, in *querypb.WatchDeltaChannelsRequest) error { c.RLock() var targetNode Node if node, ok := c.nodes[nodeID]; ok { @@ -301,14 +301,14 @@ func (c *queryNodeCluster) watchDeltaChannels(ctx context.Context, nodeID int64, return fmt.Errorf("watchDeltaChannels: can't find QueryNode by nodeID, nodeID = %d", nodeID) } -func (c *queryNodeCluster) hasWatchedDeltaChannel(ctx context.Context, nodeID int64, collectionID UniqueID) bool { +func (c *queryNodeCluster) HasWatchedDeltaChannel(ctx context.Context, nodeID int64, collectionID UniqueID) bool { c.RLock() defer c.RUnlock() return c.nodes[nodeID].hasWatchedDeltaChannel(collectionID) } -func (c *queryNodeCluster) releaseCollection(ctx context.Context, nodeID int64, in *querypb.ReleaseCollectionRequest) error { +func (c *queryNodeCluster) ReleaseCollection(ctx context.Context, nodeID int64, in *querypb.ReleaseCollectionRequest) error { c.RLock() var targetNode Node if node, ok := c.nodes[nodeID]; ok { @@ -328,7 +328,7 @@ func (c *queryNodeCluster) releaseCollection(ctx context.Context, nodeID int64, return fmt.Errorf("releaseCollection: can't find QueryNode by nodeID, nodeID = %d", nodeID) } -func (c *queryNodeCluster) releasePartitions(ctx context.Context, nodeID int64, in *querypb.ReleasePartitionsRequest) error { +func (c *queryNodeCluster) ReleasePartitions(ctx context.Context, nodeID int64, in *querypb.ReleasePartitionsRequest) error { c.RLock() var targetNode Node if node, ok := c.nodes[nodeID]; ok { @@ -348,7 +348,7 @@ func (c *queryNodeCluster) releasePartitions(ctx context.Context, nodeID int64, return fmt.Errorf("releasePartitions: can't find QueryNode by nodeID, nodeID = %d", nodeID) } -func (c *queryNodeCluster) getSegmentInfoByID(ctx context.Context, segmentID UniqueID) (*querypb.SegmentInfo, error) { +func (c *queryNodeCluster) GetSegmentInfoByID(ctx context.Context, segmentID UniqueID) (*querypb.SegmentInfo, error) { segmentInfo, err := c.clusterMeta.getSegmentInfoByID(segmentID) if err != nil { return nil, err @@ -381,7 +381,7 @@ func (c *queryNodeCluster) getSegmentInfoByID(ctx context.Context, segmentID Uni return nil, fmt.Errorf("updateSegmentInfo: can't find segment %d on QueryNode %d", segmentID, segmentInfo.NodeID) } -func (c *queryNodeCluster) getSegmentInfo(ctx context.Context, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error) { +func (c *queryNodeCluster) GetSegmentInfo(ctx context.Context, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error) { type respTuple struct { res *querypb.GetSegmentInfoResponse err error @@ -447,7 +447,7 @@ func (c *queryNodeCluster) getSegmentInfo(ctx context.Context, in *querypb.GetSe return segmentInfos, nil } -func (c *queryNodeCluster) getSegmentInfoByNode(ctx context.Context, nodeID int64, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error) { +func (c *queryNodeCluster) GetSegmentInfoByNode(ctx context.Context, nodeID int64, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error) { c.RLock() node, ok := c.nodes[nodeID] c.RUnlock() @@ -462,7 +462,7 @@ func (c *queryNodeCluster) getSegmentInfoByNode(ctx context.Context, nodeID int6 return res.GetInfos(), nil } -func (c *queryNodeCluster) syncReplicaSegments(ctx context.Context, leaderID UniqueID, in *querypb.SyncReplicaSegmentsRequest) error { +func (c *queryNodeCluster) SyncReplicaSegments(ctx context.Context, leaderID UniqueID, in *querypb.SyncReplicaSegmentsRequest) error { c.RLock() leader, ok := c.nodes[leaderID] c.RUnlock() @@ -478,7 +478,7 @@ type queryNodeGetMetricsResponse struct { err error } -func (c *queryNodeCluster) getMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) []queryNodeGetMetricsResponse { +func (c *queryNodeCluster) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) []queryNodeGetMetricsResponse { c.RLock() var wg sync.WaitGroup cnt := len(c.nodes) @@ -527,7 +527,7 @@ func (c *queryNodeCluster) setNodeState(nodeID int64, node Node, state nodeState node.setState(state) } -func (c *queryNodeCluster) registerNode(ctx context.Context, session *sessionutil.Session, id UniqueID, state nodeState) error { +func (c *queryNodeCluster) RegisterNode(ctx context.Context, session *sessionutil.Session, id UniqueID, state nodeState) error { c.Lock() defer c.Unlock() @@ -559,7 +559,7 @@ func (c *queryNodeCluster) registerNode(ctx context.Context, session *sessionuti return fmt.Errorf("registerNode: QueryNode %d alredy exists in cluster", id) } -func (c *queryNodeCluster) getNodeInfoByID(nodeID int64) (Node, error) { +func (c *queryNodeCluster) GetNodeInfoByID(nodeID int64) (Node, error) { c.RLock() node, ok := c.nodes[nodeID] c.RUnlock() @@ -574,7 +574,7 @@ func (c *queryNodeCluster) getNodeInfoByID(nodeID int64) (Node, error) { return nodeInfo, nil } -func (c *queryNodeCluster) removeNodeInfo(nodeID int64) error { +func (c *queryNodeCluster) RemoveNodeInfo(nodeID int64) error { c.Lock() defer c.Unlock() @@ -591,7 +591,7 @@ func (c *queryNodeCluster) removeNodeInfo(nodeID int64) error { return nil } -func (c *queryNodeCluster) stopNode(nodeID int64) { +func (c *queryNodeCluster) StopNode(nodeID int64) { c.RLock() defer c.RUnlock() @@ -602,7 +602,7 @@ func (c *queryNodeCluster) stopNode(nodeID int64) { } } -func (c *queryNodeCluster) onlineNodeIDs() []int64 { +func (c *queryNodeCluster) OnlineNodeIDs() []int64 { c.RLock() defer c.RUnlock() @@ -616,7 +616,7 @@ func (c *queryNodeCluster) onlineNodeIDs() []int64 { return onlineNodeIDs } -func (c *queryNodeCluster) offlineNodeIDs() []int64 { +func (c *queryNodeCluster) OfflineNodeIDs() []int64 { c.RLock() defer c.RUnlock() @@ -630,7 +630,7 @@ func (c *queryNodeCluster) offlineNodeIDs() []int64 { return offlineNodeIDs } -func (c *queryNodeCluster) hasNode(nodeID int64) bool { +func (c *queryNodeCluster) HasNode(nodeID int64) bool { c.RLock() defer c.RUnlock() @@ -641,7 +641,7 @@ func (c *queryNodeCluster) hasNode(nodeID int64) bool { return false } -func (c *queryNodeCluster) isOnline(nodeID int64) (bool, error) { +func (c *queryNodeCluster) IsOnline(nodeID int64) (bool, error) { c.RLock() defer c.RUnlock() @@ -667,17 +667,17 @@ func (c *queryNodeCluster) isOnline(nodeID int64) (bool, error) { // } //} -func (c *queryNodeCluster) allocateSegmentsToQueryNode(ctx context.Context, reqs []*querypb.LoadSegmentsRequest, wait bool, excludeNodeIDs []int64, includeNodeIDs []int64, replicaID int64) error { +func (c *queryNodeCluster) AllocateSegmentsToQueryNode(ctx context.Context, reqs []*querypb.LoadSegmentsRequest, wait bool, excludeNodeIDs []int64, includeNodeIDs []int64, replicaID int64) error { return c.segmentAllocator(ctx, reqs, c, c.clusterMeta, wait, excludeNodeIDs, includeNodeIDs, replicaID) } -func (c *queryNodeCluster) allocateChannelsToQueryNode(ctx context.Context, reqs []*querypb.WatchDmChannelsRequest, wait bool, excludeNodeIDs []int64, includeNodeIDs []int64, replicaID int64) error { +func (c *queryNodeCluster) AllocateChannelsToQueryNode(ctx context.Context, reqs []*querypb.WatchDmChannelsRequest, wait bool, excludeNodeIDs []int64, includeNodeIDs []int64, replicaID int64) error { return c.channelAllocator(ctx, reqs, c, c.clusterMeta, wait, excludeNodeIDs, includeNodeIDs, replicaID) } // Return error if no enough nodes/resources to create replicas -func (c *queryNodeCluster) assignNodesToReplicas(ctx context.Context, replicas []*milvuspb.ReplicaInfo, collectionSize uint64) error { - nodeIds := c.onlineNodeIDs() +func (c *queryNodeCluster) AssignNodesToReplicas(ctx context.Context, replicas []*milvuspb.ReplicaInfo, collectionSize uint64) error { + nodeIds := c.OnlineNodeIDs() if len(nodeIds) < len(replicas) { return fmt.Errorf("no enough nodes to create replicas, node_num=%d replica_num=%d", len(nodeIds), len(replicas)) } @@ -725,7 +725,7 @@ func getNodeInfos(cluster Cluster, nodeIds []UniqueID) []*queryNode { wg.Add(1) go func(id UniqueID) { defer wg.Done() - info, err := cluster.getNodeInfoByID(id) + info, err := cluster.GetNodeInfoByID(id) if err != nil { return } diff --git a/internal/querycoord/cluster_test.go b/internal/querycoord/cluster_test.go index abed2c11a4..d67dbc9acc 100644 --- a/internal/querycoord/cluster_test.go +++ b/internal/querycoord/cluster_test.go @@ -430,7 +430,7 @@ func TestGrpcRequest(t *testing.T) { handler, err := newChannelUnsubscribeHandler(baseCtx, kv, factory) assert.Nil(t, err) - cluster := &queryNodeCluster{ + var cluster Cluster = &queryNodeCluster{ ctx: baseCtx, cancel: cancel, client: kv, @@ -442,7 +442,7 @@ func TestGrpcRequest(t *testing.T) { } t.Run("Test GetNodeInfoByIDWithNodeNotExist", func(t *testing.T) { - _, err := cluster.getNodeInfoByID(defaultQueryNodeID) + _, err := cluster.GetNodeInfoByID(defaultQueryNodeID) assert.NotNil(t, err) }) @@ -453,7 +453,7 @@ func TestGrpcRequest(t *testing.T) { }, CollectionID: defaultCollectionID, } - _, err = cluster.getSegmentInfoByNode(baseCtx, defaultQueryNodeID, getSegmentInfoReq) + _, err = cluster.GetSegmentInfoByNode(baseCtx, defaultQueryNodeID, getSegmentInfoReq) assert.NotNil(t, err) }) @@ -461,7 +461,7 @@ func TestGrpcRequest(t *testing.T) { assert.Nil(t, err) nodeSession := node.session nodeID := node.queryNodeID - cluster.registerNode(baseCtx, nodeSession, nodeID, disConnect) + cluster.RegisterNode(baseCtx, nodeSession, nodeID, disConnect) waitQueryNodeOnline(cluster, nodeID) t.Run("Test GetComponentInfos", func(t *testing.T) { @@ -481,7 +481,7 @@ func TestGrpcRequest(t *testing.T) { Schema: genDefaultCollectionSchema(false), CollectionID: defaultCollectionID, } - err := cluster.loadSegments(baseCtx, nodeID, loadSegmentReq) + err := cluster.LoadSegments(baseCtx, nodeID, loadSegmentReq) assert.Nil(t, err) }) @@ -492,7 +492,7 @@ func TestGrpcRequest(t *testing.T) { PartitionIDs: []UniqueID{defaultPartitionID}, SegmentIDs: []UniqueID{defaultSegmentID}, } - err := cluster.releaseSegments(baseCtx, nodeID, releaseSegmentReq) + err := cluster.ReleaseSegments(baseCtx, nodeID, releaseSegmentReq) assert.Nil(t, err) }) @@ -503,7 +503,7 @@ func TestGrpcRequest(t *testing.T) { }, CollectionID: defaultCollectionID, } - _, err = cluster.getSegmentInfo(baseCtx, getSegmentInfoReq) + _, err = cluster.GetSegmentInfo(baseCtx, getSegmentInfoReq) assert.Nil(t, err) }) @@ -514,7 +514,7 @@ func TestGrpcRequest(t *testing.T) { }, CollectionID: defaultCollectionID, } - _, err = cluster.getSegmentInfoByNode(baseCtx, nodeID, getSegmentInfoReq) + _, err = cluster.GetSegmentInfoByNode(baseCtx, nodeID, getSegmentInfoReq) assert.Nil(t, err) }) @@ -527,7 +527,7 @@ func TestGrpcRequest(t *testing.T) { }, CollectionID: defaultCollectionID, } - _, err = cluster.getSegmentInfo(baseCtx, getSegmentInfoReq) + _, err = cluster.GetSegmentInfo(baseCtx, getSegmentInfoReq) assert.NotNil(t, err) }) @@ -538,14 +538,14 @@ func TestGrpcRequest(t *testing.T) { }, CollectionID: defaultCollectionID, } - _, err = cluster.getSegmentInfoByNode(baseCtx, nodeID, getSegmentInfoReq) + _, err = cluster.GetSegmentInfoByNode(baseCtx, nodeID, getSegmentInfoReq) assert.NotNil(t, err) }) node.getSegmentInfos = returnSuccessGetSegmentInfoResult t.Run("Test GetNodeInfoByID", func(t *testing.T) { - res, err := cluster.getNodeInfoByID(nodeID) + res, err := cluster.GetNodeInfoByID(nodeID) assert.Nil(t, err) assert.NotNil(t, res) }) @@ -553,13 +553,13 @@ func TestGrpcRequest(t *testing.T) { node.getMetrics = returnFailedGetMetricsResult t.Run("Test GetNodeInfoByIDFailed", func(t *testing.T) { - _, err := cluster.getNodeInfoByID(nodeID) + _, err := cluster.GetNodeInfoByID(nodeID) assert.NotNil(t, err) }) node.getMetrics = returnSuccessGetMetricsResult - cluster.stopNode(nodeID) + cluster.StopNode(nodeID) t.Run("Test GetSegmentInfoByNodeAfterNodeStop", func(t *testing.T) { getSegmentInfoReq := &querypb.GetSegmentInfoRequest{ Base: &commonpb.MsgBase{ @@ -567,7 +567,7 @@ func TestGrpcRequest(t *testing.T) { }, CollectionID: defaultCollectionID, } - _, err = cluster.getSegmentInfoByNode(baseCtx, nodeID, getSegmentInfoReq) + _, err = cluster.GetSegmentInfoByNode(baseCtx, nodeID, getSegmentInfoReq) assert.NotNil(t, err) }) @@ -608,7 +608,7 @@ func TestSetNodeState(t *testing.T) { node, err := startQueryNodeServer(baseCtx) assert.Nil(t, err) - err = cluster.registerNode(baseCtx, node.session, node.queryNodeID, disConnect) + err = cluster.RegisterNode(baseCtx, node.session, node.queryNodeID, disConnect) assert.Nil(t, err) waitQueryNodeOnline(cluster, node.queryNodeID) @@ -627,7 +627,7 @@ func TestSetNodeState(t *testing.T) { err = meta.setDeltaChannel(defaultCollectionID, []*datapb.VchannelInfo{deltaChannelInfo}) assert.Nil(t, err) - nodeInfo, err := cluster.getNodeInfoByID(node.queryNodeID) + nodeInfo, err := cluster.GetNodeInfoByID(node.queryNodeID) assert.Nil(t, err) cluster.setNodeState(node.queryNodeID, nodeInfo, offline) assert.Equal(t, 1, len(handler.downNodeChan)) diff --git a/internal/querycoord/impl.go b/internal/querycoord/impl.go index 4274d59003..26e69faa27 100644 --- a/internal/querycoord/impl.go +++ b/internal/querycoord/impl.go @@ -850,7 +850,7 @@ func (qc *QueryCoord) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmen //TODO::get segment infos from MetaReplica //segmentIDs := req.SegmentIDs //segmentInfos, err := qs.MetaReplica.getSegmentInfos(segmentIDs) - segmentInfos, err := qc.cluster.getSegmentInfo(ctx, req) + segmentInfos, err := qc.cluster.GetSegmentInfo(ctx, req) if err != nil { status.ErrorCode = commonpb.ErrorCode_UnexpectedError status.Reason = err.Error() @@ -1134,7 +1134,7 @@ func (qc *QueryCoord) GetShardLeaders(ctx context.Context, req *querypb.GetShard } } - isShardAvailable, err := qc.cluster.isOnline(shard.LeaderID) + isShardAvailable, err := qc.cluster.IsOnline(shard.LeaderID) if err != nil || !isShardAvailable { log.Warn("shard leader is unavailable", zap.Int64("collectionID", replica.CollectionID), @@ -1148,7 +1148,7 @@ func (qc *QueryCoord) GetShardLeaders(ctx context.Context, req *querypb.GetShard nodes := shardNodes[shard.DmChannelName] for _, nodeID := range replica.NodeIds { if _, ok := nodes[nodeID]; ok { - if ok, err := qc.cluster.isOnline(nodeID); err != nil || !ok { + if ok, err := qc.cluster.IsOnline(nodeID); err != nil || !ok { isShardAvailable = false break } diff --git a/internal/querycoord/impl_test.go b/internal/querycoord/impl_test.go index 3304982f84..97f7516af5 100644 --- a/internal/querycoord/impl_test.go +++ b/internal/querycoord/impl_test.go @@ -441,7 +441,7 @@ func TestGrpcTaskEnqueueFail(t *testing.T) { queryCoord.scheduler.taskIDAllocator = failedAllocator waitQueryNodeOnline(queryCoord.cluster, queryNode.queryNodeID) - assert.NotEmpty(t, queryCoord.cluster.onlineNodeIDs()) + assert.NotEmpty(t, queryCoord.cluster.OnlineNodeIDs()) t.Run("Test LoadPartition", func(t *testing.T) { status, err := queryCoord.LoadPartitions(ctx, &querypb.LoadPartitionsRequest{ @@ -579,7 +579,7 @@ func TestLoadBalanceTask(t *testing.T) { } } nodeID := queryNode1.queryNodeID - queryCoord.cluster.stopNode(nodeID) + queryCoord.cluster.StopNode(nodeID) loadBalanceSegment := &querypb.LoadBalanceRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_LoadBalanceSegments, @@ -914,7 +914,7 @@ func TestLoadCollectionWithReplicas(t *testing.T) { } // load collection with 3 replicas, but no enough querynodes - assert.Equal(t, 2, len(queryCoord.cluster.onlineNodeIDs())) + assert.Equal(t, 2, len(queryCoord.cluster.OnlineNodeIDs())) status, err := queryCoord.LoadCollection(ctx, loadCollectionReq) assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode) @@ -988,7 +988,7 @@ func TestLoadPartitionsWithReplicas(t *testing.T) { } // load collection with 3 replicas, but no enough querynodes - assert.Equal(t, 2, len(queryCoord.cluster.onlineNodeIDs())) + assert.Equal(t, 2, len(queryCoord.cluster.OnlineNodeIDs())) status, err := queryCoord.LoadPartitions(ctx, loadPartitionsReq) assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode) diff --git a/internal/querycoord/meta.go b/internal/querycoord/meta.go index 5e2cfbcd06..4a1f6c5387 100644 --- a/internal/querycoord/meta.go +++ b/internal/querycoord/meta.go @@ -306,7 +306,7 @@ func reloadShardLeaderAddress(meta Meta, cluster Cluster) error { isModified := false for _, shard := range replica.ShardReplicas { if len(shard.LeaderAddr) == 0 { - nodeInfo, err := cluster.getNodeInfoByID(shard.LeaderID) + nodeInfo, err := cluster.GetNodeInfoByID(shard.LeaderID) if err != nil { log.Warn("failed to retrieve the node's address", zap.Int64("nodeID", shard.LeaderID), diff --git a/internal/querycoord/metrics_info.go b/internal/querycoord/metrics_info.go index 7781f84775..cb2b840c1a 100644 --- a/internal/querycoord/metrics_info.go +++ b/internal/querycoord/metrics_info.go @@ -66,7 +66,7 @@ func getSystemInfoMetrics( } metricsinfo.FillDeployMetricsWithEnv(&clusterTopology.Self.SystemInfo) - nodesMetrics := qc.cluster.getMetrics(ctx, req) + nodesMetrics := qc.cluster.GetMetrics(ctx, req) for _, nodeMetrics := range nodesMetrics { if nodeMetrics.err != nil { log.Warn("invalid metrics of query node was found", diff --git a/internal/querycoord/mock_cluster.go b/internal/querycoord/mock_cluster.go index 6c3ff85b88..e3fbc417dc 100644 --- a/internal/querycoord/mock_cluster.go +++ b/internal/querycoord/mock_cluster.go @@ -28,10 +28,10 @@ func NewMockCluster(cluster Cluster) *MockCluster { } } -func (mock *MockCluster) isOnline(nodeID int64) (bool, error) { +func (mock *MockCluster) IsOnline(nodeID int64) (bool, error) { if mock.isOnlineHandler != nil { return mock.isOnlineHandler(nodeID) } - return mock.Cluster.isOnline(nodeID) + return mock.Cluster.IsOnline(nodeID) } diff --git a/internal/querycoord/query_coord.go b/internal/querycoord/query_coord.go index a3bb4608dc..2367ef90f0 100644 --- a/internal/querycoord/query_coord.go +++ b/internal/querycoord/query_coord.go @@ -346,7 +346,7 @@ func (qc *QueryCoord) watchNodeLoop() { } } - offlineNodeIDs := qc.cluster.offlineNodeIDs() + offlineNodeIDs := qc.cluster.OfflineNodeIDs() if len(offlineNodeIDs) != 0 { loadBalanceSegment := &querypb.LoadBalanceRequest{ Base: &commonpb.MsgBase{ @@ -371,7 +371,7 @@ func (qc *QueryCoord) watchNodeLoop() { } // TODO silverxia add Rewatch logic - qc.eventChan = qc.session.WatchServices(typeutil.QueryNodeRole, qc.cluster.getSessionVersion()+1, nil) + qc.eventChan = qc.session.WatchServices(typeutil.QueryNodeRole, qc.cluster.GetSessionVersion()+1, nil) qc.handleNodeEvent(ctx) } @@ -388,7 +388,7 @@ func (qc *QueryCoord) allocateNode(nodeID int64) error { return nil } func (qc *QueryCoord) getUnallocatedNodes() []int64 { - onlines := qc.cluster.onlineNodeIDs() + onlines := qc.cluster.OnlineNodeIDs() var ret []int64 for _, n := range onlines { replica, err := qc.meta.getReplicasByNodeID(n) @@ -429,7 +429,7 @@ func (qc *QueryCoord) handleNodeEvent(ctx context.Context) { case sessionutil.SessionAddEvent: serverID := event.Session.ServerID log.Info("start add a QueryNode to cluster", zap.Any("nodeID", serverID)) - err := qc.cluster.registerNode(ctx, event.Session, serverID, disConnect) + err := qc.cluster.RegisterNode(ctx, event.Session, serverID, disConnect) if err != nil { log.Error("QueryCoord failed to register a QueryNode", zap.Int64("nodeID", serverID), zap.String("error info", err.Error())) continue @@ -444,13 +444,13 @@ func (qc *QueryCoord) handleNodeEvent(ctx context.Context) { case sessionutil.SessionDelEvent: serverID := event.Session.ServerID log.Info("get a del event after QueryNode down", zap.Int64("nodeID", serverID)) - nodeExist := qc.cluster.hasNode(serverID) + nodeExist := qc.cluster.HasNode(serverID) if !nodeExist { log.Error("QueryNode not exist", zap.Int64("nodeID", serverID)) continue } - qc.cluster.stopNode(serverID) + qc.cluster.StopNode(serverID) offlineNodeCh <- serverID } } @@ -599,7 +599,7 @@ func (qc *QueryCoord) loadBalanceSegmentLoop() { nodeID2SegmentInfos := make(map[int64]map[UniqueID]*querypb.SegmentInfo) for _, nodeID := range onlineNodeIDs { if _, ok := nodeID2MemUsage[nodeID]; !ok { - nodeInfo, err := qc.cluster.getNodeInfoByID(nodeID) + nodeInfo, err := qc.cluster.GetNodeInfoByID(nodeID) if err != nil { log.Warn("loadBalanceSegmentLoop: get node info from QueryNode failed", zap.Int64("nodeID", nodeID), zap.Int64("collection", replica.CollectionID), zap.Int64("replica", replica.ReplicaID), @@ -615,7 +615,7 @@ func (qc *QueryCoord) loadBalanceSegmentLoop() { leastSegmentInfos := make(map[UniqueID]*querypb.SegmentInfo) segmentInfos := qc.meta.getSegmentInfosByNodeAndCollection(nodeID, replica.GetCollectionID()) for _, segmentInfo := range segmentInfos { - leastInfo, err := qc.cluster.getSegmentInfoByID(ctx, segmentInfo.SegmentID) + leastInfo, err := qc.cluster.GetSegmentInfoByID(ctx, segmentInfo.SegmentID) if err != nil { log.Warn("loadBalanceSegmentLoop: get segment info from QueryNode failed", zap.Int64("nodeID", nodeID), zap.Int64("collection", replica.CollectionID), zap.Int64("replica", replica.ReplicaID), diff --git a/internal/querycoord/query_coord_test.go b/internal/querycoord/query_coord_test.go index e0b9ded682..48f89dd634 100644 --- a/internal/querycoord/query_coord_test.go +++ b/internal/querycoord/query_coord_test.go @@ -187,7 +187,7 @@ func TestWatchNodeLoop(t *testing.T) { assert.Nil(t, err) for { - offlineNodeIDs := queryCoord.cluster.offlineNodeIDs() + offlineNodeIDs := queryCoord.cluster.OfflineNodeIDs() if len(offlineNodeIDs) != 0 { log.Warn("find offline Nodes", zap.Int64s("offlineNodeIDs", offlineNodeIDs)) break @@ -233,7 +233,7 @@ func TestWatchNodeLoop(t *testing.T) { nodeID := queryNode1.queryNodeID waitQueryNodeOnline(queryCoord.cluster, nodeID) - onlineNodeIDs := queryCoord.cluster.onlineNodeIDs() + onlineNodeIDs := queryCoord.cluster.OnlineNodeIDs() assert.Equal(t, 1, len(onlineNodeIDs)) queryNode1.stop() @@ -598,7 +598,7 @@ func TestLoadBalanceSegmentLoop(t *testing.T) { err = queryCoord.scheduler.Enqueue(loadPartitionTask) assert.Nil(t, err) waitTaskFinalState(loadPartitionTask, taskExpired) - nodeInfo, err := queryCoord.cluster.getNodeInfoByID(queryNode1.queryNodeID) + nodeInfo, err := queryCoord.cluster.GetNodeInfoByID(queryNode1.queryNodeID) assert.Nil(t, err) if nodeInfo.(*queryNode).memUsageRate >= Params.QueryCoordCfg.OverloadedMemoryThresholdPercentage { break @@ -612,7 +612,7 @@ func TestLoadBalanceSegmentLoop(t *testing.T) { // if sealed has been balance to query node2, than balance work for { - segmentInfos, err := queryCoord.cluster.getSegmentInfoByNode(baseCtx, queryNode2.queryNodeID, &querypb.GetSegmentInfoRequest{ + segmentInfos, err := queryCoord.cluster.GetSegmentInfoByNode(baseCtx, queryNode2.queryNodeID, &querypb.GetSegmentInfoRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_LoadBalanceSegments, }, diff --git a/internal/querycoord/querynode_test.go b/internal/querycoord/querynode_test.go index d9f03674cf..fd0c3672dd 100644 --- a/internal/querycoord/querynode_test.go +++ b/internal/querycoord/querynode_test.go @@ -69,7 +69,7 @@ func waitAllQueryNodeOffline(cluster Cluster, nodeIDs []int64) bool { for { allOffline := true for _, nodeID := range nodeIDs { - isOnline, err := cluster.isOnline(nodeID) + isOnline, err := cluster.IsOnline(nodeID) if err == nil && isOnline { allOffline = false break @@ -85,7 +85,7 @@ func waitAllQueryNodeOffline(cluster Cluster, nodeIDs []int64) bool { func waitQueryNodeOnline(cluster Cluster, nodeID int64) { for { - online, err := cluster.isOnline(nodeID) + online, err := cluster.IsOnline(nodeID) if err != nil { continue } @@ -130,7 +130,7 @@ func TestQueryNode_MultiNode_stop(t *testing.T) { }) assert.Nil(t, err) time.Sleep(100 * time.Millisecond) - onlineNodeIDs := queryCoord.cluster.onlineNodeIDs() + onlineNodeIDs := queryCoord.cluster.OnlineNodeIDs() assert.NotEqual(t, 0, len(onlineNodeIDs)) queryNode2.stop() err = removeNodeSession(queryNode2.queryNodeID) @@ -176,7 +176,7 @@ func TestQueryNode_MultiNode_reStart(t *testing.T) { CollectionID: defaultCollectionID, }) assert.Nil(t, err) - onlineNodeIDs := queryCoord.cluster.onlineNodeIDs() + onlineNodeIDs := queryCoord.cluster.OnlineNodeIDs() assert.NotEqual(t, 0, len(onlineNodeIDs)) queryNode3.stop() err = removeNodeSession(queryNode3.queryNodeID) diff --git a/internal/querycoord/segment_allocator.go b/internal/querycoord/segment_allocator.go index 3c3f9e776d..ab34e8b561 100644 --- a/internal/querycoord/segment_allocator.go +++ b/internal/querycoord/segment_allocator.go @@ -45,7 +45,7 @@ func shuffleSegmentsToQueryNode(ctx context.Context, reqs []*querypb.LoadSegment } for { - onlineNodeIDs := cluster.onlineNodeIDs() + onlineNodeIDs := cluster.OnlineNodeIDs() if len(onlineNodeIDs) == 0 { err := errors.New("no online QueryNode to allocate") log.Error("shuffleSegmentsToQueryNode failed", zap.Error(err)) @@ -117,7 +117,7 @@ func shuffleSegmentsToQueryNodeV2(ctx context.Context, reqs []*querypb.LoadSegme memUsageRate := make(map[int64]float64) var onlineNodeIDs []int64 if replicaID == -1 { - onlineNodeIDs = cluster.onlineNodeIDs() + onlineNodeIDs = cluster.OnlineNodeIDs() } else { replica, err := metaCache.getReplicaByID(replicaID) if err != nil { @@ -125,7 +125,7 @@ func shuffleSegmentsToQueryNodeV2(ctx context.Context, reqs []*querypb.LoadSegme } replicaNodes := replica.GetNodeIds() for _, nodeID := range replicaNodes { - if ok, err := cluster.isOnline(nodeID); err == nil && ok { + if ok, err := cluster.IsOnline(nodeID); err == nil && ok { onlineNodeIDs = append(onlineNodeIDs, nodeID) } } @@ -148,7 +148,7 @@ func shuffleSegmentsToQueryNodeV2(ctx context.Context, reqs []*querypb.LoadSegme continue } // statistic nodeInfo, used memory, memory usage of every query node - nodeInfo, err := cluster.getNodeInfoByID(nodeID) + nodeInfo, err := cluster.GetNodeInfoByID(nodeID) if err != nil { log.Warn("shuffleSegmentsToQueryNodeV2: getNodeInfoByID failed", zap.Error(err)) continue diff --git a/internal/querycoord/segment_allocator_test.go b/internal/querycoord/segment_allocator_test.go index 42ce453e71..2d55d426a9 100644 --- a/internal/querycoord/segment_allocator_test.go +++ b/internal/querycoord/segment_allocator_test.go @@ -99,7 +99,7 @@ func TestShuffleSegmentsToQueryNode(t *testing.T) { assert.Nil(t, err) node1Session := node1.session node1ID := node1.queryNodeID - cluster.registerNode(baseCtx, node1Session, node1ID, disConnect) + cluster.RegisterNode(baseCtx, node1Session, node1ID, disConnect) waitQueryNodeOnline(cluster, node1ID) t.Run("Test shuffleSegmentsToQueryNode", func(t *testing.T) { @@ -114,9 +114,9 @@ func TestShuffleSegmentsToQueryNode(t *testing.T) { assert.Nil(t, err) node2Session := node2.session node2ID := node2.queryNodeID - cluster.registerNode(baseCtx, node2Session, node2ID, disConnect) + cluster.RegisterNode(baseCtx, node2Session, node2ID, disConnect) waitQueryNodeOnline(cluster, node2ID) - cluster.stopNode(node1ID) + cluster.StopNode(node1ID) t.Run("Test shuffleSegmentsToQueryNodeV2", func(t *testing.T) { err = shuffleSegmentsToQueryNodeV2(baseCtx, reqs, cluster, meta, false, nil, nil, -1) diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index a998814b71..a7df14eb8e 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -336,7 +336,7 @@ func (lct *loadCollectionTask) updateTaskProcess() { // wait watchDeltaChannel task done after loading segment nodeID := getDstNodeIDByTask(t) if t.msgType() == commonpb.MsgType_LoadSegments { - if !lct.cluster.hasWatchedDeltaChannel(lct.ctx, nodeID, collectionID) { + if !lct.cluster.HasWatchedDeltaChannel(lct.ctx, nodeID, collectionID) { allDone = false break } @@ -456,7 +456,7 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error { replicaIds[i] = replica.ReplicaID } - err = lct.cluster.assignNodesToReplicas(ctx, replicas, collectionSize) + err = lct.cluster.AssignNodesToReplicas(ctx, replicas, collectionSize) if err != nil { log.Error("failed to assign nodes to replicas", zap.Int64("collectionID", collectionID), @@ -524,7 +524,7 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error { for _, internalTask := range internalTasks { lct.addChildTask(internalTask) if task, ok := internalTask.(*watchDmChannelTask); ok { - nodeInfo, err := lct.cluster.getNodeInfoByID(task.NodeID) + nodeInfo, err := lct.cluster.GetNodeInfoByID(task.NodeID) if err != nil { log.Error("loadCollectionTask: get shard leader node info failed", zap.Int64("collectionID", collectionID), @@ -593,7 +593,7 @@ func (lct *loadCollectionTask) postExecute(ctx context.Context) error { } func (lct *loadCollectionTask) rollBack(ctx context.Context) []task { - onlineNodeIDs := lct.cluster.onlineNodeIDs() + onlineNodeIDs := lct.cluster.OnlineNodeIDs() resultTasks := make([]task, 0) for _, nodeID := range onlineNodeIDs { //brute force rollBack, should optimize @@ -686,7 +686,7 @@ func (rct *releaseCollectionTask) execute(ctx context.Context) error { } // TODO(yah01): broadcast to all nodes? Or only nodes serve the collection - onlineNodeIDs := rct.cluster.onlineNodeIDs() + onlineNodeIDs := rct.cluster.OnlineNodeIDs() for _, nodeID := range onlineNodeIDs { req := proto.Clone(rct.ReleaseCollectionRequest).(*querypb.ReleaseCollectionRequest) req.NodeID = nodeID @@ -704,7 +704,7 @@ func (rct *releaseCollectionTask) execute(ctx context.Context) error { } else { // If the node crashed or be offline, the loaded segments are lost defer rct.reduceRetryCount() - err := rct.cluster.releaseCollection(ctx, rct.NodeID, rct.ReleaseCollectionRequest) + err := rct.cluster.ReleaseCollection(ctx, rct.NodeID, rct.ReleaseCollectionRequest) if err != nil { log.Warn("releaseCollectionTask: release collection end, node occur error", zap.Int64("collectionID", collectionID), zap.Int64("nodeID", rct.NodeID)) // after release failed, the task will always redo @@ -780,7 +780,7 @@ func (lpt *loadPartitionTask) updateTaskProcess() { // wait watchDeltaChannel task done after loading segment nodeID := getDstNodeIDByTask(t) if t.msgType() == commonpb.MsgType_LoadSegments { - if !lpt.cluster.hasWatchedDeltaChannel(lpt.ctx, nodeID, collectionID) { + if !lpt.cluster.HasWatchedDeltaChannel(lpt.ctx, nodeID, collectionID) { allDone = false break } @@ -889,7 +889,7 @@ func (lpt *loadPartitionTask) execute(ctx context.Context) error { replicaIds[i] = replica.ReplicaID } - err = lpt.cluster.assignNodesToReplicas(ctx, replicas, collectionSize) + err = lpt.cluster.AssignNodesToReplicas(ctx, replicas, collectionSize) if err != nil { log.Error("failed to assign nodes to replicas", zap.Int64("collectionID", collectionID), @@ -954,7 +954,7 @@ func (lpt *loadPartitionTask) execute(ctx context.Context) error { for _, internalTask := range internalTasks { lpt.addChildTask(internalTask) if task, ok := internalTask.(*watchDmChannelTask); ok { - nodeInfo, err := lpt.cluster.getNodeInfoByID(task.NodeID) + nodeInfo, err := lpt.cluster.GetNodeInfoByID(task.NodeID) if err != nil { log.Error("loadCollectionTask: get shard leader node info failed", zap.Int64("collectionID", collectionID), @@ -1031,7 +1031,7 @@ func (lpt *loadPartitionTask) rollBack(ctx context.Context) []task { collectionID := lpt.CollectionID resultTasks := make([]task, 0) //brute force rollBack, should optimize - onlineNodeIDs := lpt.cluster.onlineNodeIDs() + onlineNodeIDs := lpt.cluster.OnlineNodeIDs() for _, nodeID := range onlineNodeIDs { req := &querypb.ReleaseCollectionRequest{ Base: &commonpb.MsgBase{ @@ -1119,7 +1119,7 @@ func (rpt *releasePartitionTask) execute(ctx context.Context) error { // if nodeID ==0, it means that the release request has not been assigned to the specified query node if rpt.NodeID <= 0 { - onlineNodeIDs := rpt.cluster.onlineNodeIDs() + onlineNodeIDs := rpt.cluster.OnlineNodeIDs() for _, nodeID := range onlineNodeIDs { req := proto.Clone(rpt.ReleasePartitionsRequest).(*querypb.ReleasePartitionsRequest) req.NodeID = nodeID @@ -1137,7 +1137,7 @@ func (rpt *releasePartitionTask) execute(ctx context.Context) error { } else { // If the node crashed or be offline, the loaded segments are lost defer rpt.reduceRetryCount() - err := rpt.cluster.releasePartitions(ctx, rpt.NodeID, rpt.ReleasePartitionsRequest) + err := rpt.cluster.ReleasePartitions(ctx, rpt.NodeID, rpt.ReleasePartitionsRequest) if err != nil { log.Warn("ReleasePartitionsTask: release partition end, node occur error", zap.Int64("collectionID", collectionID), zap.String("nodeID", fmt.Sprintln(rpt.NodeID))) // after release failed, the task will always redo @@ -1195,7 +1195,7 @@ func (lst *loadSegmentTask) marshal() ([]byte, error) { } func (lst *loadSegmentTask) isValid() bool { - online, err := lst.cluster.isOnline(lst.DstNodeID) + online, err := lst.cluster.IsOnline(lst.DstNodeID) if err != nil { return false } @@ -1242,7 +1242,7 @@ func (lst *loadSegmentTask) preExecute(ctx context.Context) error { func (lst *loadSegmentTask) execute(ctx context.Context) error { defer lst.reduceRetryCount() - err := lst.cluster.loadSegments(ctx, lst.DstNodeID, lst.LoadSegmentsRequest) + err := lst.cluster.LoadSegments(ctx, lst.DstNodeID, lst.LoadSegmentsRequest) if err != nil { log.Warn("loadSegmentTask: loadSegment occur error", zap.Int64("taskID", lst.getTaskID())) lst.setResultInfo(err) @@ -1322,7 +1322,7 @@ func (rst *releaseSegmentTask) marshal() ([]byte, error) { } func (rst *releaseSegmentTask) isValid() bool { - online, err := rst.cluster.isOnline(rst.NodeID) + online, err := rst.cluster.IsOnline(rst.NodeID) if err != nil { return false } @@ -1350,7 +1350,7 @@ func (rst *releaseSegmentTask) preExecute(context.Context) error { func (rst *releaseSegmentTask) execute(ctx context.Context) error { defer rst.reduceRetryCount() - err := rst.cluster.releaseSegments(rst.ctx, rst.leaderID, rst.ReleaseSegmentsRequest) + err := rst.cluster.ReleaseSegments(rst.ctx, rst.leaderID, rst.ReleaseSegmentsRequest) if err != nil { log.Warn("releaseSegmentTask: releaseSegment occur error", zap.Int64("taskID", rst.getTaskID())) rst.setResultInfo(err) @@ -1388,7 +1388,7 @@ func (wdt *watchDmChannelTask) marshal() ([]byte, error) { } func (wdt *watchDmChannelTask) isValid() bool { - online, err := wdt.cluster.isOnline(wdt.NodeID) + online, err := wdt.cluster.IsOnline(wdt.NodeID) if err != nil { return false } @@ -1429,7 +1429,7 @@ func (wdt *watchDmChannelTask) preExecute(context.Context) error { func (wdt *watchDmChannelTask) execute(ctx context.Context) error { defer wdt.reduceRetryCount() - err := wdt.cluster.watchDmChannels(wdt.ctx, wdt.NodeID, wdt.WatchDmChannelsRequest) + err := wdt.cluster.WatchDmChannels(wdt.ctx, wdt.NodeID, wdt.WatchDmChannelsRequest) if err != nil { log.Warn("watchDmChannelTask: watchDmChannel occur error", zap.Int64("taskID", wdt.getTaskID())) wdt.setResultInfo(err) @@ -1502,7 +1502,7 @@ func (wdt *watchDeltaChannelTask) marshal() ([]byte, error) { } func (wdt *watchDeltaChannelTask) isValid() bool { - online, err := wdt.cluster.isOnline(wdt.NodeID) + online, err := wdt.cluster.IsOnline(wdt.NodeID) if err != nil { return false } @@ -1544,7 +1544,7 @@ func (wdt *watchDeltaChannelTask) preExecute(context.Context) error { func (wdt *watchDeltaChannelTask) execute(ctx context.Context) error { defer wdt.reduceRetryCount() - err := wdt.cluster.watchDeltaChannels(wdt.ctx, wdt.NodeID, wdt.WatchDeltaChannelsRequest) + err := wdt.cluster.WatchDeltaChannels(wdt.ctx, wdt.NodeID, wdt.WatchDeltaChannelsRequest) if err != nil { log.Warn("watchDeltaChannelTask: watchDeltaChannel occur error", zap.Int64("taskID", wdt.getTaskID()), zap.Error(err)) wdt.setResultInfo(err) @@ -2042,7 +2042,7 @@ func (lbt *loadBalanceTask) processManualLoadBalance(ctx context.Context) error balancedSegmentInfos := make(map[UniqueID]*querypb.SegmentInfo) balancedSegmentIDs := make([]UniqueID, 0) for _, nodeID := range lbt.SourceNodeIDs { - nodeExist := lbt.cluster.hasNode(nodeID) + nodeExist := lbt.cluster.HasNode(nodeID) if !nodeExist { err := fmt.Errorf("loadBalanceTask: query node %d is not exist to balance", nodeID) log.Error(err.Error()) @@ -2302,7 +2302,7 @@ func (lbt *loadBalanceTask) globalPostExecute(ctx context.Context) error { // then the queryCoord will panic, and the nodeInfo should not be removed immediately // after queryCoord recovery, the balanceTask will redo for _, offlineNodeID := range lbt.SourceNodeIDs { - err := lbt.cluster.removeNodeInfo(offlineNodeID) + err := lbt.cluster.RemoveNodeInfo(offlineNodeID) if err != nil { log.Error("loadBalanceTask: occur error when removing node info from cluster", zap.Int64("nodeID", offlineNodeID), @@ -2345,7 +2345,7 @@ func (lbt *loadBalanceTask) globalPostExecute(ctx context.Context) error { leaderID := task.NodeID dmChannel := task.Infos[0].ChannelName - nodeInfo, err := lbt.cluster.getNodeInfoByID(leaderID) + nodeInfo, err := lbt.cluster.GetNodeInfoByID(leaderID) if err != nil { log.Error("failed to get node info to update shard leader info", zap.Int64("triggerTaskID", lbt.getTaskID()), @@ -2399,14 +2399,14 @@ func assignInternalTask(ctx context.Context, broker *globalMetaBroker) ([]task, error) { internalTasks := make([]task, 0) - err := cluster.allocateSegmentsToQueryNode(ctx, loadSegmentRequests, wait, excludeNodeIDs, includeNodeIDs, replicaID) + err := cluster.AllocateSegmentsToQueryNode(ctx, loadSegmentRequests, wait, excludeNodeIDs, includeNodeIDs, replicaID) if err != nil { log.Error("assignInternalTask: assign segment to node failed", zap.Error(err)) return nil, err } log.Info("assignInternalTask: assign segment to node success", zap.Int("load segments", len(loadSegmentRequests))) - err = cluster.allocateChannelsToQueryNode(ctx, watchDmChannelRequests, wait, excludeNodeIDs, includeNodeIDs, replicaID) + err = cluster.AllocateChannelsToQueryNode(ctx, watchDmChannelRequests, wait, excludeNodeIDs, includeNodeIDs, replicaID) if err != nil { log.Error("assignInternalTask: assign dmChannel to node failed", zap.Error(err)) return nil, err diff --git a/internal/querycoord/task_scheduler.go b/internal/querycoord/task_scheduler.go index abd865b757..452b93f434 100644 --- a/internal/querycoord/task_scheduler.go +++ b/internal/querycoord/task_scheduler.go @@ -1019,7 +1019,7 @@ func generateDerivedInternalTasks(triggerTask task, meta Meta, cluster Cluster) collectionID := loadSegmentTask.CollectionID replicaID := loadSegmentTask.GetReplicaID() nodeID := loadSegmentTask.DstNodeID - if !cluster.hasWatchedDeltaChannel(triggerTask.traceCtx(), nodeID, collectionID) { + if !cluster.HasWatchedDeltaChannel(triggerTask.traceCtx(), nodeID, collectionID) { addChannelWatchInfoFn(nodeID, collectionID, replicaID, watchDeltaChannelInfo) } } diff --git a/internal/querycoord/util.go b/internal/querycoord/util.go index 81f8ce862b..bb5a9255c8 100644 --- a/internal/querycoord/util.go +++ b/internal/querycoord/util.go @@ -188,7 +188,7 @@ func syncReplicaSegments(ctx context.Context, cluster Cluster, childTasks []task } } - err := cluster.syncReplicaSegments(ctx, leader.LeaderID, &req) + err := cluster.SyncReplicaSegments(ctx, leader.LeaderID, &req) if err != nil { return err }