From 5124ed97588c60b3f0b39afe254ae7c65ae5b409 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Tue, 22 Jul 2025 12:37:01 +0800 Subject: [PATCH] fix: Fix import fileStats incorrectly set to nil (#43463) 1. Ensure that tasks in the InProgress state return valid fileStats. 2. Enhance import logs. issue: https://github.com/milvus-io/milvus/issues/43387 --------- Signed-off-by: bigsheeper --- internal/datacoord/import_checker.go | 4 +- internal/datacoord/import_task_import.go | 35 +++++----- internal/datacoord/import_task_preimport.go | 15 ++-- internal/datacoord/session/cluster.go | 24 +++---- internal/datanode/importv2/task_import.go | 8 ++- internal/datanode/importv2/task_l0_import.go | 8 ++- .../datanode/importv2/task_l0_preimport.go | 7 +- internal/datanode/importv2/task_preimport.go | 7 +- internal/datanode/services.go | 70 +++++++++++++------ 9 files changed, 116 insertions(+), 62 deletions(-) diff --git a/internal/datacoord/import_checker.go b/internal/datacoord/import_checker.go index d3f07507c9..82cad5d21d 100644 --- a/internal/datacoord/import_checker.go +++ b/internal/datacoord/import_checker.go @@ -228,7 +228,7 @@ func (c *importChecker) checkPendingJob(job ImportJob) { log.Warn("add preimport task failed", WrapTaskLog(t, zap.Error(err))...) return } - log.Info("add new preimport task", WrapTaskLog(t)...) + log.Info("add new preimport task", WrapTaskLog(t, zap.Any("fileStats", t.GetFileStats()))...) } err = c.importMeta.UpdateJob(c.ctx, job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting)) @@ -300,7 +300,7 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) { updateJobState(internalpb.ImportJobState_Failed, UpdateJobReason(err.Error())) return } - log.Info("add new import task", WrapTaskLog(t)...) + log.Info("add new import task", WrapTaskLog(t, zap.Any("fileStats", t.GetFileStats()))...) } updateJobState(internalpb.ImportJobState_Importing, UpdateRequestedDiskSize(requestSize)) diff --git a/internal/datacoord/import_task_import.go b/internal/datacoord/import_task_import.go index d76e6a47b9..3bb54d3287 100644 --- a/internal/datacoord/import_task_import.go +++ b/internal/datacoord/import_task_import.go @@ -183,23 +183,26 @@ func (t *importTask) QueryTaskOnWorker(cluster session.Cluster) { dbName = collInfo.DatabaseName } - for _, info := range resp.GetImportSegmentsInfo() { - segment := t.meta.GetSegment(context.TODO(), info.GetSegmentID()) - if info.GetImportedRows() <= segment.GetNumOfRows() { - continue // rows not changed, no need to update - } - diff := info.GetImportedRows() - segment.GetNumOfRows() - op := UpdateImportedRows(info.GetSegmentID(), info.GetImportedRows()) - err = t.meta.UpdateSegmentsInfo(context.TODO(), op) - if err != nil { - log.Warn("update import segment rows failed", WrapTaskLog(t, zap.Error(err))...) - return - } + if resp.GetState() == datapb.ImportTaskStateV2_InProgress || resp.GetState() == datapb.ImportTaskStateV2_Completed { + for _, info := range resp.GetImportSegmentsInfo() { + segment := t.meta.GetSegment(context.TODO(), info.GetSegmentID()) + if info.GetImportedRows() <= segment.GetNumOfRows() { + continue // rows not changed, no need to update + } + diff := info.GetImportedRows() - segment.GetNumOfRows() + op := UpdateImportedRows(info.GetSegmentID(), info.GetImportedRows()) + err = t.meta.UpdateSegmentsInfo(context.TODO(), op) + if err != nil { + log.Warn("update import segment rows failed", WrapTaskLog(t, zap.Error(err))...) + return + } + log.Info("update import segment rows done", WrapTaskLog(t, zap.Int64("segmentID", info.GetSegmentID()), zap.Int64("importedRows", info.GetImportedRows()))...) - metrics.DataCoordBulkVectors.WithLabelValues( - dbName, - strconv.FormatInt(t.GetCollectionID(), 10), - ).Add(float64(diff)) + metrics.DataCoordBulkVectors.WithLabelValues( + dbName, + strconv.FormatInt(t.GetCollectionID(), 10), + ).Add(float64(diff)) + } } if resp.GetState() == datapb.ImportTaskStateV2_Completed { for _, info := range resp.GetImportSegmentsInfo() { diff --git a/internal/datacoord/import_task_preimport.go b/internal/datacoord/import_task_preimport.go index 7e558fc83a..93644736d3 100644 --- a/internal/datacoord/import_task_preimport.go +++ b/internal/datacoord/import_task_preimport.go @@ -152,14 +152,19 @@ func (p *preImportTask) QueryTaskOnWorker(cluster session.Cluster) { log.Warn("preimport failed", WrapTaskLog(p, zap.String("reason", resp.GetReason()))...) return } - actions := []UpdateAction{UpdateFileStats(resp.GetFileStats())} + actions := []UpdateAction{} + if resp.GetState() == datapb.ImportTaskStateV2_InProgress || resp.GetState() == datapb.ImportTaskStateV2_Completed { + actions = append(actions, UpdateFileStats(resp.GetFileStats())) + } if resp.GetState() == datapb.ImportTaskStateV2_Completed { actions = append(actions, UpdateState(datapb.ImportTaskStateV2_Completed)) } - err = p.importMeta.UpdateTask(context.TODO(), p.GetTaskID(), actions...) - if err != nil { - log.Warn("update preimport task failed", WrapTaskLog(p, zap.Error(err))...) - return + if len(actions) > 0 { + err = p.importMeta.UpdateTask(context.TODO(), p.GetTaskID(), actions...) + if err != nil { + log.Warn("update preimport task failed", WrapTaskLog(p, zap.Error(err))...) + return + } } log.Info("query preimport", WrapTaskLog(p, zap.String("respState", resp.GetState().String()), zap.Any("fileStats", resp.GetFileStats()))...) diff --git a/internal/datacoord/session/cluster.go b/internal/datacoord/session/cluster.go index c8928cb116..fe3cafcdad 100644 --- a/internal/datacoord/session/cluster.go +++ b/internal/datacoord/session/cluster.go @@ -220,9 +220,9 @@ func (c *cluster) QueryCompaction(nodeID int64, in *datapb.CompactionStateReques return nil, err } switch state { - case taskcommon.None, taskcommon.Init, taskcommon.InProgress, taskcommon.Retry: + case taskcommon.None, taskcommon.Init, taskcommon.Retry: return &datapb.CompactionPlanResult{State: taskcommon.ToCompactionState(state)}, nil - case taskcommon.Finished, taskcommon.Failed: + case taskcommon.InProgress, taskcommon.Finished, taskcommon.Failed: result := &datapb.CompactionStateResponse{} err = proto.Unmarshal(resp.GetPayload(), result) if err != nil { @@ -290,9 +290,9 @@ func (c *cluster) QueryPreImport(nodeID int64, in *datapb.QueryPreImportRequest) } reason := resProperties.GetTaskReason() switch state { - case taskcommon.None, taskcommon.Init, taskcommon.InProgress, taskcommon.Retry: + case taskcommon.None, taskcommon.Init, taskcommon.Retry: return &datapb.QueryPreImportResponse{State: taskcommon.ToImportState(state), Reason: reason}, nil - case taskcommon.Finished, taskcommon.Failed: + case taskcommon.InProgress, taskcommon.Finished, taskcommon.Failed: result := &datapb.QueryPreImportResponse{} err = proto.Unmarshal(resp.GetPayload(), result) if err != nil { @@ -321,9 +321,9 @@ func (c *cluster) QueryImport(nodeID int64, in *datapb.QueryImportRequest) (*dat } reason := resProperties.GetTaskReason() switch state { - case taskcommon.None, taskcommon.Init, taskcommon.InProgress, taskcommon.Retry: + case taskcommon.None, taskcommon.Init, taskcommon.Retry: return &datapb.QueryImportResponse{State: taskcommon.ToImportState(state), Reason: reason}, nil - case taskcommon.Finished, taskcommon.Failed: + case taskcommon.InProgress, taskcommon.Finished, taskcommon.Failed: result := &datapb.QueryImportResponse{} err = proto.Unmarshal(resp.GetPayload(), result) if err != nil { @@ -371,7 +371,7 @@ func (c *cluster) QueryIndex(nodeID int64, in *workerpb.QueryJobsRequest) (*work } reason := resProperties.GetTaskReason() switch state { - case taskcommon.None, taskcommon.Init, taskcommon.InProgress, taskcommon.Retry: + case taskcommon.None, taskcommon.Init, taskcommon.Retry: return &workerpb.IndexJobResults{ Results: []*workerpb.IndexTaskInfo{ { @@ -381,7 +381,7 @@ func (c *cluster) QueryIndex(nodeID int64, in *workerpb.QueryJobsRequest) (*work }, }, }, nil - case taskcommon.Finished, taskcommon.Failed: + case taskcommon.InProgress, taskcommon.Finished, taskcommon.Failed: result := &workerpb.QueryJobsV2Response{} err = proto.Unmarshal(resp.GetPayload(), result) if err != nil { @@ -430,7 +430,7 @@ func (c *cluster) QueryStats(nodeID int64, in *workerpb.QueryJobsRequest) (*work } reason := resProperties.GetTaskReason() switch state { - case taskcommon.None, taskcommon.Init, taskcommon.InProgress, taskcommon.Retry: + case taskcommon.None, taskcommon.Init, taskcommon.Retry: return &workerpb.StatsResults{ Results: []*workerpb.StatsResult{ { @@ -440,7 +440,7 @@ func (c *cluster) QueryStats(nodeID int64, in *workerpb.QueryJobsRequest) (*work }, }, }, nil - case taskcommon.Finished, taskcommon.Failed: + case taskcommon.InProgress, taskcommon.Finished, taskcommon.Failed: result := &workerpb.QueryJobsV2Response{} err = proto.Unmarshal(resp.GetPayload(), result) if err != nil { @@ -487,7 +487,7 @@ func (c *cluster) QueryAnalyze(nodeID int64, in *workerpb.QueryJobsRequest) (*wo } reason := resProperties.GetTaskReason() switch state { - case taskcommon.None, taskcommon.Init, taskcommon.InProgress, taskcommon.Retry: + case taskcommon.None, taskcommon.Init, taskcommon.Retry: return &workerpb.AnalyzeResults{ Results: []*workerpb.AnalyzeResult{ { @@ -497,7 +497,7 @@ func (c *cluster) QueryAnalyze(nodeID int64, in *workerpb.QueryJobsRequest) (*wo }, }, }, nil - case taskcommon.Finished, taskcommon.Failed: + case taskcommon.InProgress, taskcommon.Finished, taskcommon.Failed: result := &workerpb.QueryJobsV2Response{} err = proto.Unmarshal(resp.GetPayload(), result) if err != nil { diff --git a/internal/datanode/importv2/task_import.go b/internal/datanode/importv2/task_import.go index 14e0293971..bdf8c4aa6e 100644 --- a/internal/datanode/importv2/task_import.go +++ b/internal/datanode/importv2/task_import.go @@ -130,6 +130,10 @@ func (t *ImportTask) Clone() Task { cancel: cancel, segmentsInfo: infos, req: t.req, + allocator: t.allocator, + manager: t.manager, + syncMgr: t.syncMgr, + cm: t.cm, metaCaches: t.metaCaches, } } @@ -139,7 +143,9 @@ func (t *ImportTask) Execute() []*conc.Future[any] { log.Info("start to import", WrapLogFields(t, zap.Int("bufferSize", bufferSize), zap.Int64("taskSlot", t.GetSlots()), - zap.Any("schema", t.GetSchema()))...) + zap.Any("files", t.GetFileStats()), + zap.Any("schema", t.GetSchema()), + )...) t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress)) req := t.req diff --git a/internal/datanode/importv2/task_l0_import.go b/internal/datanode/importv2/task_l0_import.go index 9815ea5b7f..ffc96c13cc 100644 --- a/internal/datanode/importv2/task_l0_import.go +++ b/internal/datanode/importv2/task_l0_import.go @@ -127,6 +127,10 @@ func (t *L0ImportTask) Clone() Task { cancel: cancel, segmentsInfo: infos, req: t.req, + allocator: t.allocator, + manager: t.manager, + syncMgr: t.syncMgr, + cm: t.cm, metaCaches: t.metaCaches, } } @@ -136,7 +140,9 @@ func (t *L0ImportTask) Execute() []*conc.Future[any] { log.Info("start to import l0", WrapLogFields(t, zap.Int("bufferSize", bufferSize), zap.Int64("taskSlot", t.GetSlots()), - zap.Any("schema", t.GetSchema()))...) + zap.Any("files", t.GetFileStats()), + zap.Any("schema", t.GetSchema()), + )...) t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress)) req := t.req diff --git a/internal/datanode/importv2/task_l0_preimport.go b/internal/datanode/importv2/task_l0_preimport.go index 3bc17ae862..8f085208f5 100644 --- a/internal/datanode/importv2/task_l0_preimport.go +++ b/internal/datanode/importv2/task_l0_preimport.go @@ -117,6 +117,9 @@ func (t *L0PreImportTask) Clone() Task { partitionIDs: t.GetPartitionIDs(), vchannels: t.GetVchannels(), schema: t.GetSchema(), + req: t.req, + manager: t.manager, + cm: t.cm, } } @@ -125,7 +128,9 @@ func (t *L0PreImportTask) Execute() []*conc.Future[any] { log.Info("start to preimport l0", WrapLogFields(t, zap.Int("bufferSize", bufferSize), zap.Int64("taskSlot", t.GetSlots()), - zap.Any("schema", t.GetSchema()))...) + zap.Any("files", t.GetFileStats()), + zap.Any("schema", t.GetSchema()), + )...) t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress)) files := lo.Map(t.GetFileStats(), func(fileStat *datapb.ImportFileStats, _ int) *internalpb.ImportFile { diff --git a/internal/datanode/importv2/task_preimport.go b/internal/datanode/importv2/task_preimport.go index 6ed1b19a86..bdc04132e4 100644 --- a/internal/datanode/importv2/task_preimport.go +++ b/internal/datanode/importv2/task_preimport.go @@ -125,6 +125,9 @@ func (t *PreImportTask) Clone() Task { vchannels: t.GetVchannels(), schema: t.GetSchema(), options: t.options, + req: t.req, + manager: t.manager, + cm: t.cm, } } @@ -133,7 +136,9 @@ func (t *PreImportTask) Execute() []*conc.Future[any] { log.Info("start to preimport", WrapLogFields(t, zap.Int("bufferSize", bufferSize), zap.Int64("taskSlot", t.GetSlots()), - zap.Any("schema", t.GetSchema()))...) + zap.Any("files", t.GetFileStats()), + zap.Any("schema", t.GetSchema()), + )...) t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress)) files := lo.Map(t.GetFileStats(), func(fileStat *datapb.ImportFileStats, _ int) *internalpb.ImportFile { diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 3939c186f5..e9df938dda 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -372,8 +372,7 @@ func (node *DataNode) ImportV2(ctx context.Context, req *datapb.ImportRequest) ( } func (node *DataNode) QueryPreImport(ctx context.Context, req *datapb.QueryPreImportRequest) (*datapb.QueryPreImportResponse, error) { - log := log.Ctx(ctx).With(zap.Int64("taskID", req.GetTaskID()), - zap.Int64("jobID", req.GetJobID())) + log := log.Ctx(ctx).WithRateGroup("datanode.QueryPreImport", 1, 60) if err := merr.CheckHealthy(node.GetStateCode()); err != nil { return &datapb.QueryPreImportResponse{Status: merr.Status(err)}, nil @@ -384,22 +383,34 @@ func (node *DataNode) QueryPreImport(ctx context.Context, req *datapb.QueryPreIm Status: merr.Status(importv2.WrapTaskNotFoundError(req.GetTaskID())), }, nil } - log.RatedInfo(10, "datanode query preimport", zap.String("state", task.GetState().String()), - zap.String("reason", task.GetReason())) + fileStats := task.(interface { + GetFileStats() []*datapb.ImportFileStats + }).GetFileStats() + logFields := []zap.Field{ + zap.Int64("taskID", task.GetTaskID()), + zap.Int64("jobID", task.GetJobID()), + zap.String("state", task.GetState().String()), + zap.String("reason", task.GetReason()), + zap.Int64("nodeID", node.GetNodeID()), + zap.Any("fileStats", fileStats), + } + if task.GetState() == datapb.ImportTaskStateV2_InProgress { + log.RatedInfo(30, "datanode query preimport", logFields...) + } else { + log.Info("datanode query preimport", logFields...) + } + return &datapb.QueryPreImportResponse{ - Status: merr.Success(), - TaskID: task.GetTaskID(), - State: task.GetState(), - Reason: task.GetReason(), - FileStats: task.(interface { - GetFileStats() []*datapb.ImportFileStats - }).GetFileStats(), + Status: merr.Success(), + TaskID: task.GetTaskID(), + State: task.GetState(), + Reason: task.GetReason(), + FileStats: fileStats, }, nil } func (node *DataNode) QueryImport(ctx context.Context, req *datapb.QueryImportRequest) (*datapb.QueryImportResponse, error) { - log := log.Ctx(ctx).With(zap.Int64("taskID", req.GetTaskID()), - zap.Int64("jobID", req.GetJobID())) + log := log.Ctx(ctx).WithRateGroup("datanode.QueryImport", 1, 60) if err := merr.CheckHealthy(node.GetStateCode()); err != nil { return &datapb.QueryImportResponse{Status: merr.Status(err)}, nil @@ -420,22 +431,35 @@ func (node *DataNode) QueryImport(ctx context.Context, req *datapb.QueryImportRe Status: merr.Status(importv2.WrapTaskNotFoundError(req.GetTaskID())), }, nil } - log.RatedInfo(10, "datanode query import", zap.String("state", task.GetState().String()), - zap.String("reason", task.GetReason())) + segmentsInfo := task.(interface { + GetSegmentsInfo() []*datapb.ImportSegmentInfo + }).GetSegmentsInfo() + logFields := []zap.Field{ + zap.Int64("taskID", task.GetTaskID()), + zap.Int64("jobID", task.GetJobID()), + zap.String("state", task.GetState().String()), + zap.String("reason", task.GetReason()), + zap.Int64("nodeID", node.GetNodeID()), + zap.Any("segmentsInfo", segmentsInfo), + } + if task.GetState() == datapb.ImportTaskStateV2_InProgress { + log.RatedInfo(30, "datanode query import", logFields...) + } else { + log.Info("datanode query import", logFields...) + } return &datapb.QueryImportResponse{ - Status: merr.Success(), - TaskID: task.GetTaskID(), - State: task.GetState(), - Reason: task.GetReason(), - ImportSegmentsInfo: task.(interface { - GetSegmentsInfo() []*datapb.ImportSegmentInfo - }).GetSegmentsInfo(), + Status: merr.Success(), + TaskID: task.GetTaskID(), + State: task.GetState(), + Reason: task.GetReason(), + ImportSegmentsInfo: segmentsInfo, }, nil } func (node *DataNode) DropImport(ctx context.Context, req *datapb.DropImportRequest) (*commonpb.Status, error) { log := log.Ctx(ctx).With(zap.Int64("taskID", req.GetTaskID()), - zap.Int64("jobID", req.GetJobID())) + zap.Int64("jobID", req.GetJobID()), + zap.Int64("nodeID", node.GetNodeID())) if err := merr.CheckHealthy(node.GetStateCode()); err != nil { return merr.Status(err), nil