From aa96843d316400bc2fccbbee9b01db87b193851f Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Sat, 13 Apr 2024 22:03:23 +0800 Subject: [PATCH] fix: Fix import hanging and improve logging output (#32166) Fix import hanging when the previous import task failed, and improve parquet import logging outout. issue: https://github.com/milvus-io/milvus/issues/31834 Signed-off-by: bigsheeper --- internal/datanode/importv2/scheduler.go | 2 +- internal/util/importutilv2/parquet/util.go | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/datanode/importv2/scheduler.go b/internal/datanode/importv2/scheduler.go index 9e9b8e61d4..edbd92f001 100644 --- a/internal/datanode/importv2/scheduler.go +++ b/internal/datanode/importv2/scheduler.go @@ -98,7 +98,7 @@ func (s *scheduler) Start() { for taskID, fs := range futures { err := conc.AwaitAll(fs...) if err != nil { - return + continue } s.manager.Update(taskID, UpdateState(datapb.ImportTaskStateV2_Completed)) log.Info("preimport/import done", zap.Int64("taskID", taskID)) diff --git a/internal/util/importutilv2/parquet/util.go b/internal/util/importutilv2/parquet/util.go index 6accba57c5..766f94419f 100644 --- a/internal/util/importutilv2/parquet/util.go +++ b/internal/util/importutilv2/parquet/util.go @@ -76,6 +76,9 @@ func CreateFieldReaders(ctx context.Context, fileReader *pqarrow.FileReader, sch } } if !isConvertible(arrowType, dataType, isList) { + if isList { + return nil, WrapTypeErr(dataType.String(), pqField.Type.(*arrow.ListType).ElemField().Type.Name(), field) + } return nil, WrapTypeErr(dataType.String(), pqField.Type.Name(), field) }