diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 9cb0d2d560..bce9927793 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -966,7 +966,7 @@ func (s *Server) WatchChannels(ctx context.Context, req *datapb.WatchChannelsReq // GetFlushState gets the flush state of multiple segments func (s *Server) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) { - log.Info("received get flush state request", zap.Int64s("segmentIDs", req.GetSegmentIDs()), zap.Int("len", len(req.GetSegmentIDs()))) + log.Info("DataCoord receive get flush state request", zap.Int64s("segmentIDs", req.GetSegmentIDs()), zap.Int("len", len(req.GetSegmentIDs()))) resp := &milvuspb.GetFlushStateResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}} if s.isClosed() { @@ -1003,7 +1003,7 @@ func (s *Server) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateR // Import distributes the import tasks to dataNodes. // It returns a failed status if no dataNode is available or if any error occurs. func (s *Server) Import(ctx context.Context, itr *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error) { - log.Info("received import request", zap.Any("import task request", itr)) + log.Info("DataCoord receive import request", zap.Any("import task request", itr)) resp := &datapb.ImportTaskResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -1021,19 +1021,20 @@ func (s *Server) Import(ctx context.Context, itr *datapb.ImportTaskRequest) (*da log.Error("import failed as all dataNodes are offline") return resp, nil } + avaNodes := getDiff(nodes, itr.GetWorkingNodes()) if len(avaNodes) > 0 { // If there exists available DataNodes, pick one at random. - dnID := avaNodes[rand.Intn(len(avaNodes))] + resp.DatanodeId = avaNodes[rand.Intn(len(avaNodes))] log.Info("picking a free dataNode", zap.Any("all dataNodes", nodes), - zap.Int64("picking free dataNode with ID", dnID)) - s.cluster.Import(s.ctx, dnID, itr) + zap.Int64("picking free dataNode with ID", resp.GetDatanodeId())) + s.cluster.Import(s.ctx, resp.GetDatanodeId(), itr) } else { // No dataNode is available, reject the import request. - errMsg := "all dataNodes are busy working on data import, please try again later or add new dataNode instances" - log.Error(errMsg, zap.Int64("task ID", itr.GetImportTask().GetTaskId())) - resp.Status.Reason = errMsg + msg := "all dataNodes are busy working on data import, the task has been rejected and wait for idle datanode" + log.Info(msg, zap.Int64("task ID", itr.GetImportTask().GetTaskId())) + resp.Status.Reason = msg return resp, nil } diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 681f82fabb..bc75b9e7ee 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -796,19 +796,22 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan // Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest) (*commonpb.Status, error) { - log.Info("receive import request", + log.Info("DataNode receive import request", zap.Int64("task ID", req.GetImportTask().GetTaskId()), zap.Int64("collection ID", req.GetImportTask().GetCollectionId()), zap.Int64("partition ID", req.GetImportTask().GetPartitionId()), zap.Any("channel names", req.GetImportTask().GetChannelNames()), zap.Any("working dataNodes", req.WorkingNodes)) + defer func() { + log.Info("DataNode finish import request", zap.Int64("task ID", req.GetImportTask().GetTaskId())) + }() importResult := &rootcoordpb.ImportResult{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, }, TaskId: req.GetImportTask().TaskId, - DatanodeId: Params.DataNodeCfg.GetNodeID(), + DatanodeId: node.NodeID, State: commonpb.ImportState_ImportStarted, Segments: make([]int64, 0), AutoIds: make([]int64, 0), diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index d5b1990d44..19eb4b80e3 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -2335,7 +2335,7 @@ func (c *Core) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvus zap.Error(err)) return nil, err } - log.Info("receive import request", + log.Info("RootCoord receive import request", zap.String("collection name", req.GetCollectionName()), zap.Int64("collection ID", cID), zap.String("partition name", req.GetPartitionName()), @@ -2376,7 +2376,7 @@ func (c *Core) ListImportTasks(ctx context.Context, req *milvuspb.ListImportTask // ReportImport reports import task state to RootCoord. func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) (*commonpb.Status, error) { - log.Info("receive import state report", + log.Info("RootCoord receive import state report", zap.Int64("task ID", ir.GetTaskId()), zap.Any("import state", ir.GetState())) if code, ok := c.checkHealthy(); !ok { @@ -2391,6 +2391,25 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) ( }, nil } + // This method update a busy node to idle node, and send import task to idle node + resendTaskFunc := func() { + func() { + c.importManager.busyNodesLock.Lock() + defer c.importManager.busyNodesLock.Unlock() + delete(c.importManager.busyNodes, ir.GetDatanodeId()) + log.Info("DataNode is no longer busy", + zap.Int64("dataNode ID", ir.GetDatanodeId()), + zap.Int64("task ID", ir.GetTaskId())) + + }() + c.importManager.sendOutTasks(c.importManager.ctx) + } + + // If task failed, send task to idle datanode + if ir.GetState() == commonpb.ImportState_ImportFailed { + resendTaskFunc() + } + // So much for reporting, unless the task just reached `ImportPersisted` state. if ir.GetState() != commonpb.ImportState_ImportPersisted { log.Debug("non import-persisted state received, return immediately", @@ -2415,15 +2434,8 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) ( } colName = colMeta.GetSchema().GetName() - // When DataNode has done its thing, remove it from the busy node list. - func() { - c.importManager.busyNodesLock.Lock() - defer c.importManager.busyNodesLock.Unlock() - delete(c.importManager.busyNodes, ir.GetDatanodeId()) - log.Info("dataNode is no longer busy", - zap.Int64("dataNode ID", ir.GetDatanodeId()), - zap.Int64("task ID", ir.GetTaskId())) - }() + // When DataNode has done its thing, remove it from the busy node list. And send import task again + resendTaskFunc() // Flush all import data segments. c.CallFlushOnCollection(ctx, ti.GetCollectionId(), ir.GetSegments())