From df0ffd08ce48b9ff389b1803c13283b7ca750348 Mon Sep 17 00:00:00 2001 From: groot Date: Wed, 11 Jan 2023 11:55:39 +0800 Subject: [PATCH] Update segment id for import task (#21621) Signed-off-by: groot --- internal/datanode/services.go | 26 ++++++++++++++++++++++++++ internal/rootcoord/import_manager.go | 9 ++++++++- 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 0dcc77f834..4c576cfe90 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -713,6 +713,32 @@ func assignSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest) importutil zap.Int64("segmentID", segmentID), zap.Int("shard ID", shardID), zap.String("target channel name", targetChName)) + + // call report to notify the rootcoord update the segment id list for this task + // ignore the returned error, since even report failed the segments still can be cleaned + retry.Do(context.Background(), func() error { + importResult := &rootcoordpb.ImportResult{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + TaskId: req.GetImportTask().TaskId, + DatanodeId: paramtable.GetNodeID(), + State: commonpb.ImportState_ImportStarted, + Segments: []int64{segmentID}, + AutoIds: make([]int64, 0), + RowCount: 0, + } + status, err := node.rootCoord.ReportImport(context.Background(), importResult) + if err != nil { + log.Error("fail to report import state to RootCoord", zap.Error(err)) + return err + } + if status != nil && status.ErrorCode != commonpb.ErrorCode_Success { + return errors.New(status.GetReason()) + } + return nil + }) + return segmentID, targetChName, nil } } diff --git a/internal/rootcoord/import_manager.go b/internal/rootcoord/import_manager.go index 9e32bd3aaf..b8cf500375 100644 --- a/internal/rootcoord/import_manager.go +++ b/internal/rootcoord/import_manager.go @@ -560,7 +560,12 @@ func (m *importManager) updateTaskInfo(ir *rootcoordpb.ImportResult) (*datapb.Im // Meta persist should be done before memory objs change. toPersistImportTaskInfo = cloneImportTaskInfo(v) toPersistImportTaskInfo.State.StateCode = ir.GetState() - toPersistImportTaskInfo.State.Segments = ir.GetSegments() + // if is started state, append the new created segment id + if v.GetState().GetStateCode() == commonpb.ImportState_ImportStarted { + toPersistImportTaskInfo.State.Segments = append(toPersistImportTaskInfo.State.Segments, ir.GetSegments()...) + } else { + toPersistImportTaskInfo.State.Segments = ir.GetSegments() + } toPersistImportTaskInfo.State.RowCount = ir.GetRowCount() toPersistImportTaskInfo.State.RowIds = ir.GetAutoIds() for _, kv := range ir.GetInfos() { @@ -571,6 +576,8 @@ func (m *importManager) updateTaskInfo(ir *rootcoordpb.ImportResult) (*datapb.Im toPersistImportTaskInfo.Infos = append(toPersistImportTaskInfo.Infos, kv) } } + log.Info("importManager update task info", zap.Any("toPersistImportTaskInfo", toPersistImportTaskInfo)) + // Update task in task store. if err := m.persistTaskInfo(toPersistImportTaskInfo); err != nil { log.Error("failed to update import task",