From 7039fb7c8212a79b06de05bb6f50ac8f24809e45 Mon Sep 17 00:00:00 2001 From: xige-16 Date: Tue, 22 Jun 2021 14:10:09 +0800 Subject: [PATCH] Fix the problem of stuck after loadBalance and loadFieldData error (#5960) * delete nodeInfo after nodeDown Signed-off-by: xige-16 * fix load balance can's stop Signed-off-by: xige-16 * fix load field data error Signed-off-by: xige-16 * contiue task loop after error in queryService Signed-off-by: xige-16 --- internal/distributed/querynode/client/client.go | 4 ++++ internal/querynode/collection_replica.go | 11 +++++++++++ internal/querynode/impl.go | 3 +++ internal/querynode/segment_loader.go | 11 +++++------ internal/queryservice/cluster.go | 17 ++++++++++++++++- internal/queryservice/querynode.go | 11 +++++++++++ internal/queryservice/queryservice.go | 2 ++ internal/queryservice/task.go | 6 ++++++ internal/queryservice/task_scheduler.go | 13 +++++++++---- 9 files changed, 67 insertions(+), 11 deletions(-) diff --git a/internal/distributed/querynode/client/client.go b/internal/distributed/querynode/client/client.go index 7c338766eb..b1b1c882d1 100644 --- a/internal/distributed/querynode/client/client.go +++ b/internal/distributed/querynode/client/client.go @@ -17,6 +17,7 @@ import ( "time" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" @@ -105,6 +106,9 @@ func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error) if err == nil { return ret, nil } + if c.conn.GetState() == connectivity.Shutdown { + return ret, err + } for i := 0; i < c.reconnTry; i++ { err = c.connect() if err == nil { diff --git a/internal/querynode/collection_replica.go b/internal/querynode/collection_replica.go index 4469e0975b..03da3f13ac 100644 --- a/internal/querynode/collection_replica.go +++ b/internal/querynode/collection_replica.go @@ -84,6 +84,7 @@ type ReplicaInterface interface { replaceGrowingSegmentBySealedSegment(segment *Segment) error freeAll() + printReplica() } type collectionReplica struct { @@ -99,6 +100,16 @@ type collectionReplica struct { etcdKV *etcdkv.EtcdKV } +func (colReplica *collectionReplica) printReplica() { + colReplica.mu.Lock() + defer colReplica.mu.Unlock() + + log.Debug("collections in collectionReplica", zap.Any("info", colReplica.collections)) + log.Debug("partitions in collectionReplica", zap.Any("info", colReplica.partitions)) + log.Debug("segments in collectionReplica", zap.Any("info", colReplica.segments)) + log.Debug("excludedSegments in collectionReplica", zap.Any("info", colReplica.excludedSegments)) +} + //----------------------------------------------------------------------------------------------------- collection func (colReplica *collectionReplica) getCollectionIDs() []UniqueID { colReplica.mu.RLock() diff --git a/internal/querynode/impl.go b/internal/querynode/impl.go index a01e7fc06b..e525b8f35a 100644 --- a/internal/querynode/impl.go +++ b/internal/querynode/impl.go @@ -374,6 +374,7 @@ func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmen return info } // get info from historical + node.historical.replica.printReplica() for _, id := range in.SegmentIDs { log.Debug("QueryNode::Impl::GetSegmentInfo for historical", zap.Any("SegmentID", id)) segment, err := node.historical.replica.getSegmentByID(id) @@ -387,8 +388,10 @@ func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmen infos = append(infos, info) } // get info from streaming + node.streaming.replica.printReplica() for _, id := range in.SegmentIDs { log.Debug("QueryNode::Impl::GetSegmentInfo for streaming", zap.Any("SegmentID", id)) + segment, err := node.streaming.replica.getSegmentByID(id) if err != nil { log.Debug("QueryNode::Impl::GetSegmentInfo, for streaming segmentID not exist", zap.Any("SegmentID", id)) diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index a4be62e6a1..076e6db306 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -234,19 +234,18 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, binlogPaths zap.Any("fieldID", fieldID), zap.String("paths", fmt.Sprintln(paths)), ) - blob := &storage.Blob{ - Key: strconv.FormatInt(fieldID, 10), - Value: make([]byte, 0), - } for _, path := range paths { binLog, err := loader.minioKV.Load(path) if err != nil { // TODO: return or continue? return err } - blob.Value = append(blob.Value, []byte(binLog)...) + blob := &storage.Blob{ + Key: strconv.FormatInt(fieldID, 10), + Value: []byte(binLog), + } + blobs = append(blobs, blob) } - blobs = append(blobs, blob) } _, _, insertData, err := iCodec.Deserialize(blobs) diff --git a/internal/queryservice/cluster.go b/internal/queryservice/cluster.go index 3485fde320..e5bb10c338 100644 --- a/internal/queryservice/cluster.go +++ b/internal/queryservice/cluster.go @@ -406,8 +406,23 @@ func (c *queryNodeCluster) RegisterNode(session *sessionutil.Session, id UniqueI } func (c *queryNodeCluster) removeNodeInfo(nodeID int64) error { + c.Lock() + defer c.Unlock() + key := fmt.Sprintf("%s/%d", queryNodeInfoPrefix, nodeID) - return c.client.Remove(key) + err := c.client.Remove(key) + if err != nil { + return err + } + + err = c.nodes[nodeID].clearNodeInfo() + if err != nil { + return err + } + delete(c.nodes, nodeID) + log.Debug("delete nodeInfo in cluster meta and etcd", zap.Int64("nodeID", nodeID)) + + return nil } func (c *queryNodeCluster) onServiceNodeIDs() ([]int64, error) { diff --git a/internal/queryservice/querynode.go b/internal/queryservice/querynode.go index 3e73d3363b..bb86e18eda 100644 --- a/internal/queryservice/querynode.go +++ b/internal/queryservice/querynode.go @@ -310,6 +310,17 @@ func (qn *queryNode) removeCollectionInfo(collectionID UniqueID) error { return qn.kvClient.Remove(key) } +func (qn *queryNode) clearNodeInfo() error { + for collectionID := range qn.collectionInfos { + err := qn.removeCollectionInfo(collectionID) + if err != nil { + return err + } + } + + return nil +} + func (qn *queryNode) setNodeState(onService bool) { qn.Lock() defer qn.Unlock() diff --git a/internal/queryservice/queryservice.go b/internal/queryservice/queryservice.go index 6636f2beb2..ff11f65240 100644 --- a/internal/queryservice/queryservice.go +++ b/internal/queryservice/queryservice.go @@ -194,6 +194,7 @@ func (qs *QueryService) watchNodeLoop() { for nodeID := range qs.cluster.nodes { if _, ok := sessionMap[nodeID]; !ok { qs.cluster.nodes[nodeID].setNodeState(false) + qs.cluster.nodes[nodeID].client.Stop() loadBalanceSegment := &querypb.LoadBalanceRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_LoadBalanceSegments, @@ -236,6 +237,7 @@ func (qs *QueryService) watchNodeLoop() { serverID := event.Session.ServerID log.Debug("QueryService", zap.Any("The QueryNode crashed with ID", serverID)) qs.cluster.nodes[serverID].setNodeState(false) + qs.cluster.nodes[serverID].client.Stop() loadBalanceSegment := &querypb.LoadBalanceRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_LoadBalanceSegments, diff --git a/internal/queryservice/task.go b/internal/queryservice/task.go index 5348a33b70..1ad5b8674d 100644 --- a/internal/queryservice/task.go +++ b/internal/queryservice/task.go @@ -1273,6 +1273,12 @@ func (lbt *LoadBalanceTask) Execute(ctx context.Context) error { } func (lbt *LoadBalanceTask) PostExecute(ctx context.Context) error { + for _, id := range lbt.SourceNodeIDs { + err := lbt.cluster.removeNodeInfo(id) + if err != nil { + log.Error("LoadBalanceTask: remove mode info error", zap.Int64("nodeID", id)) + } + } log.Debug("LoadBalanceTask postExecute done", zap.Int64s("sourceNodeIDs", lbt.SourceNodeIDs), zap.Any("balanceReason", lbt.BalanceReason), diff --git a/internal/queryservice/task_scheduler.go b/internal/queryservice/task_scheduler.go index d740fbd733..5580a8e300 100644 --- a/internal/queryservice/task_scheduler.go +++ b/internal/queryservice/task_scheduler.go @@ -504,8 +504,9 @@ func (scheduler *TaskScheduler) processTask(t task) error { func (scheduler *TaskScheduler) scheduleLoop() { defer scheduler.wg.Done() activeTaskWg := &sync.WaitGroup{} - var err error = nil + for { + var err error = nil select { case <-scheduler.ctx.Done(): return @@ -516,6 +517,7 @@ func (scheduler *TaskScheduler) scheduleLoop() { err = scheduler.processTask(t) if err != nil { log.Error("scheduleLoop: process task error", zap.Any("error", err.Error())) + t.Notify(err) continue } if t.Type() == commonpb.MsgType_LoadCollection || t.Type() == commonpb.MsgType_LoadPartitions { @@ -532,9 +534,9 @@ func (scheduler *TaskScheduler) scheduleLoop() { } } activeTaskWg.Wait() - if t.Type() == commonpb.MsgType_ReleaseCollection || t.Type() == commonpb.MsgType_ReleasePartitions { - t.Notify(err) - } + //if t.Type() == commonpb.MsgType_ReleaseCollection || t.Type() == commonpb.MsgType_ReleasePartitions { + // t.Notify(err) + //} keys := make([]string, 0) taskKey := fmt.Sprintf("%s/%d", triggerTaskPrefix, t.ID()) stateKey := fmt.Sprintf("%s/%d", taskInfoPrefix, t.ID()) @@ -543,8 +545,11 @@ func (scheduler *TaskScheduler) scheduleLoop() { err = scheduler.client.MultiRemove(keys) if err != nil { log.Error("scheduleLoop: error when remove trigger task to etcd", zap.Int64("taskID", t.ID())) + t.Notify(err) + continue } log.Debug("scheduleLoop: trigger task done and delete from etcd", zap.Int64("taskID", t.ID())) + t.Notify(err) } } }