From 8b8df0a5e9942b16c44a401ce295233db2c3ec9e Mon Sep 17 00:00:00 2001 From: Ten Thousand Leaves <69466447+soothing-rain@users.noreply.github.com> Date: Fri, 14 Oct 2022 15:15:24 +0800 Subject: [PATCH] Do not use DataCoord context when DataNode is handling import task (#19732) So that when DataCoord is done, DataNode can still proceed. /kind bug issue: #19730 Signed-off-by: Yuchen Gao Signed-off-by: Yuchen Gao --- internal/datanode/data_node.go | 59 ++++++++++++++-------- internal/datanode/data_node_test.go | 12 +++-- internal/datanode/mock_test.go | 4 +- internal/util/importutil/import_wrapper.go | 7 ++- 4 files changed, 53 insertions(+), 29 deletions(-) diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 621659ce6f..5a7f0d5c7b 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -78,6 +78,9 @@ const ( // ConnectEtcdMaxRetryTime is used to limit the max retry time for connection etcd ConnectEtcdMaxRetryTime = 100 + + // ImportCallTimeout is the timeout used in Import() method calls. + ImportCallTimeout = 30 * time.Second ) var getFlowGraphServiceAttempts = uint(50) @@ -95,14 +98,15 @@ var rateCol *rateCollector // services in datanode package. // // DataNode implements `types.Component`, `types.DataNode` interfaces. -// `etcdCli` is a connection of etcd -// `rootCoord` is a grpc client of root coordinator. -// `dataCoord` is a grpc client of data service. -// `NodeID` is unique to each datanode. -// `State` is current statement of this data node, indicating whether it's healthy. // -// `clearSignal` is a signal channel for releasing the flowgraph resources. -// `segmentCache` stores all flushing and flushed segments. +// `etcdCli` is a connection of etcd +// `rootCoord` is a grpc client of root coordinator. +// `dataCoord` is a grpc client of data service. +// `NodeID` is unique to each datanode. +// `State` is current statement of this data node, indicating whether it's healthy. +// +// `clearSignal` is a signal channel for releasing the flowgraph resources. +// `segmentCache` stores all flushing and flushed segments. type DataNode struct { ctx context.Context cancel context.CancelFunc @@ -570,10 +574,11 @@ func (node *DataNode) ReadyToFlush() error { } // FlushSegments packs flush messages into flowGraph through flushChan. -// If DataNode receives a valid segment to flush, new flush message for the segment should be ignored. -// So if receiving calls to flush segment A, DataNode should guarantee the segment to be flushed. // -// One precondition: The segmentID in req is in ascending order. +// If DataNode receives a valid segment to flush, new flush message for the segment should be ignored. +// So if receiving calls to flush segment A, DataNode should guarantee the segment to be flushed. +// +// One precondition: The segmentID in req is in ascending order. func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsRequest) (*commonpb.Status, error) { metrics.DataNodeFlushReqCounter.WithLabelValues( fmt.Sprint(Params.DataNodeCfg.GetNodeID()), @@ -719,7 +724,7 @@ func (node *DataNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.Strin }, nil } -//ShowConfigurations returns the configurations of DataNode matching req.Pattern +// ShowConfigurations returns the configurations of DataNode matching req.Pattern func (node *DataNode) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) { log.Debug("DataNode.ShowConfigurations", zap.String("pattern", req.Pattern)) if !node.isHealthy() { @@ -962,11 +967,15 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest) AutoIds: make([]int64, 0), RowCount: 0, } - // func to report import state to rootcoord + + // Spawn a new context to ignore cancellation from parental context. + newCtx, cancel := context.WithTimeout(context.TODO(), ImportCallTimeout) + defer cancel() + // func to report import state to RootCoord. reportFunc := func(res *rootcoordpb.ImportResult) error { - status, err := node.rootCoord.ReportImport(ctx, res) + status, err := node.rootCoord.ReportImport(newCtx, res) if err != nil { - log.Error("fail to report import state to root coord", zap.Error(err)) + log.Error("fail to report import state to RootCoord", zap.Error(err)) return err } if status != nil && status.ErrorCode != commonpb.ErrorCode_Success { @@ -979,7 +988,7 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest) log.Warn("DataNode import failed", zap.Int64("collection ID", req.GetImportTask().GetCollectionId()), zap.Int64("partition ID", req.GetImportTask().GetPartitionId()), - zap.Int64("taskID", req.GetImportTask().GetTaskId()), + zap.Int64("task ID", req.GetImportTask().GetTaskId()), zap.Error(errDataNodeIsUnhealthy(Params.DataNodeCfg.GetNodeID()))) return &commonpb.Status{ @@ -989,7 +998,8 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest) } // get a timestamp for all the rows - rep, err := node.rootCoord.AllocTimestamp(ctx, &rootcoordpb.AllocTimestampRequest{ + // Ignore cancellation from parent context. + rep, err := node.rootCoord.AllocTimestamp(newCtx, &rootcoordpb.AllocTimestampRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_RequestTSO, MsgID: 0, @@ -1005,7 +1015,7 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest) importResult.State = commonpb.ImportState_ImportFailed importResult.Infos = append(importResult.Infos, &commonpb.KeyValuePair{Key: "failed_reason", Value: msg}) if reportErr := reportFunc(importResult); reportErr != nil { - log.Warn("fail to report import state to root coord", zap.Error(reportErr)) + log.Warn("fail to report import state to RootCoord", zap.Error(reportErr)) } if err != nil { return &commonpb.Status{ @@ -1019,13 +1029,17 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest) // get collection schema and shard number metaService := newMetaService(node.rootCoord, req.GetImportTask().GetCollectionId()) - colInfo, err := metaService.getCollectionInfo(ctx, req.GetImportTask().GetCollectionId(), 0) + colInfo, err := metaService.getCollectionInfo(newCtx, req.GetImportTask().GetCollectionId(), 0) if err != nil { + log.Warn("failed to get collection info for collection ID", + zap.Int64("task ID", req.GetImportTask().GetTaskId()), + zap.Int64("collection ID", req.GetImportTask().GetCollectionId()), + zap.Error(err)) importResult.State = commonpb.ImportState_ImportFailed importResult.Infos = append(importResult.Infos, &commonpb.KeyValuePair{Key: "failed_reason", Value: err.Error()}) reportErr := reportFunc(importResult) if reportErr != nil { - log.Warn("fail to report import state to root coord", zap.Error(err)) + log.Warn("fail to report import state to RootCoord", zap.Error(err)) } return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -1035,15 +1049,18 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest) // parse files and generate segments segmentSize := int64(Params.DataCoordCfg.SegmentMaxSize) * 1024 * 1024 - importWrapper := importutil.NewImportWrapper(ctx, colInfo.GetSchema(), colInfo.GetShardsNum(), segmentSize, node.rowIDAllocator, node.chunkManager, + importWrapper := importutil.NewImportWrapper(newCtx, colInfo.GetSchema(), colInfo.GetShardsNum(), segmentSize, node.rowIDAllocator, node.chunkManager, importFlushReqFunc(node, req, importResult, colInfo.GetSchema(), ts), importResult, reportFunc) err = importWrapper.Import(req.GetImportTask().GetFiles(), req.GetImportTask().GetRowBased(), false) if err != nil { + log.Warn("import wrapper failed to parse import request", + zap.Int64("task ID", req.GetImportTask().GetTaskId()), + zap.Error(err)) importResult.State = commonpb.ImportState_ImportFailed importResult.Infos = append(importResult.Infos, &commonpb.KeyValuePair{Key: "failed_reason", Value: err.Error()}) reportErr := reportFunc(importResult) if reportErr != nil { - log.Warn("fail to report import state to root coord", zap.Error(err)) + log.Warn("fail to report import state to RootCoord", zap.Error(err)) } return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index 56b608c259..33186232c8 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -43,6 +43,7 @@ import ( "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/etcd" + "github.com/milvus-io/milvus/internal/util/importutil" "github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/stretchr/testify/assert" @@ -76,6 +77,8 @@ func TestMain(t *testing.M) { } func TestDataNode(t *testing.T) { + importutil.ReportImportAttempts = 1 + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -465,7 +468,7 @@ func TestDataNode(t *testing.T) { }, } node.rootCoord.(*RootCoordFactory).ReportImportErr = true - _, err = node.Import(context.WithValue(ctx, ctxKey{}, ""), req) + _, err = node.Import(node.ctx, req) assert.NoError(t, err) node.rootCoord.(*RootCoordFactory).ReportImportErr = false @@ -548,8 +551,9 @@ func TestDataNode(t *testing.T) { t.Run("Test Import report import error", func(t *testing.T) { node.rootCoord = &RootCoordFactory{ - collectionID: 100, - pkType: schemapb.DataType_Int64, + collectionID: 100, + pkType: schemapb.DataType_Int64, + ReportImportErr: true, } content := []byte(`{ "rows":[ @@ -573,7 +577,7 @@ func TestDataNode(t *testing.T) { RowBased: true, }, } - stat, err := node.Import(context.WithValue(node.ctx, ctxKey{}, returnError), req) + stat, err := node.Import(node.ctx, req) assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_UnexpectedError, stat.GetErrorCode()) }) diff --git a/internal/datanode/mock_test.go b/internal/datanode/mock_test.go index 6e3c024ad7..bfb128057a 100644 --- a/internal/datanode/mock_test.go +++ b/internal/datanode/mock_test.go @@ -279,7 +279,7 @@ func (ds *DataCoordFactory) BroadcastAlteredCollection(ctx context.Context, req func (ds *DataCoordFactory) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) { if ds.GetSegmentInfosError { - return nil, errors.New("mock error") + return nil, errors.New("mock get segment info error") } if ds.GetSegmentInfosNotSuccess { return &datapb.GetSegmentInfoResponse{ @@ -1027,7 +1027,7 @@ func (m *RootCoordFactory) ReportImport(ctx context.Context, req *rootcoordpb.Im if m.ReportImportErr { return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, - }, fmt.Errorf("mock error") + }, fmt.Errorf("mock report import error") } if m.ReportImportNotSuccess { return &commonpb.Status{ diff --git a/internal/util/importutil/import_wrapper.go b/internal/util/importutil/import_wrapper.go index 8136924fef..a552564d39 100644 --- a/internal/util/importutil/import_wrapper.go +++ b/internal/util/importutil/import_wrapper.go @@ -61,6 +61,9 @@ const ( MaxTotalSizeInMemory = 2 * 1024 * 1024 * 1024 // 2GB ) +// ReportImportAttempts is the maximum # of attempts to retry when import fails. +var ReportImportAttempts uint = 10 + type ImportWrapper struct { ctx context.Context // for canceling parse process cancel context.CancelFunc // for canceling parse process @@ -342,9 +345,9 @@ func (p *ImportWrapper) reportPersisted() error { // persist state task is valuable, retry more times in case fail this task only because of network error reportErr := retry.Do(p.ctx, func() error { return p.reportFunc(p.importResult) - }, retry.Attempts(10)) + }, retry.Attempts(ReportImportAttempts)) if reportErr != nil { - log.Warn("import wrapper: fail to report import state to root coord", zap.Error(reportErr)) + log.Warn("import wrapper: fail to report import state to RootCoord", zap.Error(reportErr)) return reportErr } return nil