diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 47a21506fe..f0aa5f5670 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -1881,7 +1881,13 @@ func (s *Server) ListImports(ctx context.Context, req *internalpb.ListImportsReq Progresses: make([]int64, 0), } - jobs := s.importMeta.GetJobBy(WithCollectionID(req.GetCollectionID())) + var jobs []ImportJob + if req.GetCollectionID() != 0 { + jobs = s.importMeta.GetJobBy(WithCollectionID(req.GetCollectionID())) + } else { + jobs = s.importMeta.GetJobBy() + } + for _, job := range jobs { progress, state, reason := GetImportProgress(job.GetJobID(), s.importMeta, s.meta) resp.JobIDs = append(resp.JobIDs, fmt.Sprintf("%d", job.GetJobID())) diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 90055e660e..fe8b95e345 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -4200,127 +4200,127 @@ func (node *Proxy) checkHealthy() bool { return code == commonpb.StateCode_Healthy } +func convertToV2ImportRequest(req *milvuspb.ImportRequest) *internalpb.ImportRequest { + return &internalpb.ImportRequest{ + DbName: req.GetDbName(), + CollectionName: req.GetCollectionName(), + PartitionName: req.GetPartitionName(), + Files: []*internalpb.ImportFile{{ + Paths: req.GetFiles(), + }}, + Options: req.GetOptions(), + } +} + +func convertToV1ImportResponse(rsp *internalpb.ImportResponse) *milvuspb.ImportResponse { + if rsp.GetStatus().GetCode() != 0 { + return &milvuspb.ImportResponse{ + Status: rsp.GetStatus(), + } + } + jobID, err := strconv.ParseInt(rsp.GetJobID(), 10, 64) + if err != nil { + return &milvuspb.ImportResponse{ + Status: merr.Status(merr.WrapErrImportFailed(err.Error())), + } + } + return &milvuspb.ImportResponse{ + Status: rsp.GetStatus(), + Tasks: []int64{jobID}, + } +} + +func convertToV2GetImportRequest(req *milvuspb.GetImportStateRequest) *internalpb.GetImportProgressRequest { + return &internalpb.GetImportProgressRequest{ + JobID: strconv.FormatInt(req.GetTask(), 10), + } +} + +func convertToV1GetImportResponse(rsp *internalpb.GetImportProgressResponse) *milvuspb.GetImportStateResponse { + if rsp.GetStatus().GetCode() != 0 { + return &milvuspb.GetImportStateResponse{ + Status: rsp.GetStatus(), + } + } + convertState := func(state internalpb.ImportJobState) commonpb.ImportState { + switch state { + case internalpb.ImportJobState_Pending: + return commonpb.ImportState_ImportPending + case internalpb.ImportJobState_Importing: + return commonpb.ImportState_ImportStarted + case internalpb.ImportJobState_Completed: + return commonpb.ImportState_ImportCompleted + case internalpb.ImportJobState_Failed: + return commonpb.ImportState_ImportFailed + } + return commonpb.ImportState_ImportFailed + } + infos := make([]*commonpb.KeyValuePair, 0) + infos = append(infos, &commonpb.KeyValuePair{ + Key: importutil.FailedReason, + Value: rsp.GetReason(), + }) + infos = append(infos, &commonpb.KeyValuePair{ + Key: importutil.ProgressPercent, + Value: strconv.FormatInt(rsp.GetProgress(), 10), + }) + return &milvuspb.GetImportStateResponse{ + Status: rsp.GetStatus(), + State: convertState(rsp.GetState()), + RowCount: 0, + IdList: nil, + Infos: infos, + Id: 0, + CollectionId: 0, + SegmentIds: nil, + CreateTs: 0, + } +} + +func convertToV2ListImportRequest(req *milvuspb.ListImportTasksRequest) *internalpb.ListImportsRequest { + return &internalpb.ListImportsRequest{ + DbName: req.GetDbName(), + CollectionName: req.GetCollectionName(), + } +} + +func convertToV1ListImportResponse(rsp *internalpb.ListImportsResponse) *milvuspb.ListImportTasksResponse { + if rsp.GetStatus().GetCode() != 0 { + return &milvuspb.ListImportTasksResponse{ + Status: rsp.GetStatus(), + } + } + responses := make([]*milvuspb.GetImportStateResponse, 0, len(rsp.GetStates())) + for i := 0; i < len(rsp.GetStates()); i++ { + responses = append(responses, convertToV1GetImportResponse(&internalpb.GetImportProgressResponse{ + Status: rsp.GetStatus(), + State: rsp.GetStates()[i], + Reason: rsp.GetReasons()[i], + Progress: rsp.GetProgresses()[i], + })) + } + return &milvuspb.ListImportTasksResponse{ + Status: rsp.GetStatus(), + Tasks: responses, + } +} + // Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments func (node *Proxy) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error) { - ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Import") - defer sp.End() - - log := log.Ctx(ctx) - - log.Info("received import request", - zap.String("collectionName", req.GetCollectionName()), - zap.String("partition name", req.GetPartitionName()), - zap.Strings("files", req.GetFiles())) - resp := &milvuspb.ImportResponse{ - Status: merr.Success(), - } - if err := merr.CheckHealthy(node.GetStateCode()); err != nil { - resp.Status = merr.Status(err) - return resp, nil - } - - err := importutil.ValidateOptions(req.GetOptions()) - if err != nil { - log.Error("failed to execute import request", - zap.Error(err)) - resp.Status = merr.Status(err) - return resp, nil - } - - method := "Import" - tr := timerecord.NewTimeRecorder(method) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() - - // Call rootCoord to finish import. - respFromRC, err := node.rootCoord.Import(ctx, req) - if err != nil { - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() - log.Error("failed to execute bulk insert request", - zap.Error(err)) - resp.Status = merr.Status(err) - return resp, nil - } - - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) - return respFromRC, nil + rsp, err := node.ImportV2(ctx, convertToV2ImportRequest(req)) + return convertToV1ImportResponse(rsp), err } // GetImportState checks import task state from RootCoord. func (node *Proxy) GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error) { - ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetImportState") - defer sp.End() - - log := log.Ctx(ctx) - - log.Debug("received get import state request", - zap.Int64("taskID", req.GetTask())) - resp := &milvuspb.GetImportStateResponse{ - Status: merr.Success(), - } - if err := merr.CheckHealthy(node.GetStateCode()); err != nil { - resp.Status = merr.Status(err) - return resp, nil - } - method := "GetImportState" - tr := timerecord.NewTimeRecorder(method) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() - - resp, err := node.rootCoord.GetImportState(ctx, req) - if err != nil { - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() - log.Error("failed to execute get import state", - zap.Error(err)) - resp.Status = merr.Status(err) - return resp, nil - } - - log.Debug("successfully received get import state response", - zap.Int64("taskID", req.GetTask()), - zap.Any("resp", resp), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) - return resp, nil + rsp, err := node.GetImportProgress(ctx, convertToV2GetImportRequest(req)) + return convertToV1GetImportResponse(rsp), err } // ListImportTasks get id array of all import tasks from rootcoord func (node *Proxy) ListImportTasks(ctx context.Context, req *milvuspb.ListImportTasksRequest) (*milvuspb.ListImportTasksResponse, error) { - ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-ListImportTasks") - defer sp.End() - - log := log.Ctx(ctx) - - log.Debug("received list import tasks request") - resp := &milvuspb.ListImportTasksResponse{ - Status: merr.Success(), - } - if err := merr.CheckHealthy(node.GetStateCode()); err != nil { - resp.Status = merr.Status(err) - return resp, nil - } - method := "ListImportTasks" - tr := timerecord.NewTimeRecorder(method) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() - resp, err := node.rootCoord.ListImportTasks(ctx, req) - if err != nil { - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() - log.Error("failed to execute list import tasks", - zap.Error(err)) - resp.Status = merr.Status(err) - return resp, nil - } - - log.Debug("successfully received list import tasks response", - zap.String("collection", req.CollectionName), - zap.Any("tasks", lo.SliceToMap(resp.GetTasks(), func(state *milvuspb.GetImportStateResponse) (int64, commonpb.ImportState) { - return state.GetId(), state.GetState() - }))) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) - return resp, err + rsp, err := node.ListImports(ctx, convertToV2ListImportRequest(req)) + return convertToV1ListImportResponse(rsp), err } // InvalidateCredentialCache invalidate the credential cache of specified username. @@ -5557,13 +5557,31 @@ func (node *Proxy) ImportV2(ctx context.Context, req *internalpb.ImportRequest) return &internalpb.ImportResponse{Status: merr.Status(err)}, nil } log := log.Ctx(ctx).With( + zap.String("collectionName", req.GetCollectionName()), + zap.String("partition name", req.GetPartitionName()), + zap.Any("files", req.GetFiles()), zap.String("role", typeutil.ProxyRole), ) - method := "ImportV2" - log.Info(rpcReceived(method)) + resp := &internalpb.ImportResponse{ Status: merr.Success(), } + + method := "ImportV2" + tr := timerecord.NewTimeRecorder(method) + log.Info(rpcReceived(method)) + + nodeID := fmt.Sprint(paramtable.GetNodeID()) + defer func() { + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.TotalLabel).Inc() + if resp.GetStatus().GetCode() != 0 { + log.Warn("import failed", zap.String("err", resp.GetStatus().GetReason())) + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.FailLabel).Inc() + } else { + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.SuccessLabel).Inc() + } + }() + collectionID, err := globalMetaCache.GetCollectionID(ctx, req.GetDbName(), req.GetCollectionName()) if err != nil { resp.Status = merr.Status(err) @@ -5628,7 +5646,13 @@ func (node *Proxy) ImportV2(ctx context.Context, req *internalpb.ImportRequest) Files: req.GetFiles(), Options: req.GetOptions(), } - return node.dataCoord.ImportV2(ctx, importRequest) + resp, err = node.dataCoord.ImportV2(ctx, importRequest) + if err != nil { + log.Warn("import failed", zap.Error(err)) + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.FailLabel).Inc() + } + metrics.ProxyReqLatency.WithLabelValues(nodeID, method).Observe(float64(tr.ElapseSpan().Milliseconds())) + return resp, err } func (node *Proxy) GetImportProgress(ctx context.Context, req *internalpb.GetImportProgressRequest) (*internalpb.GetImportProgressResponse, error) { @@ -5637,7 +5661,24 @@ func (node *Proxy) GetImportProgress(ctx context.Context, req *internalpb.GetImp Status: merr.Status(err), }, nil } - return node.dataCoord.GetImportProgress(ctx, req) + log := log.Ctx(ctx).With( + zap.String("jobID", req.GetJobID()), + ) + method := "GetImportProgress" + tr := timerecord.NewTimeRecorder(method) + log.Info(rpcReceived(method)) + + nodeID := fmt.Sprint(paramtable.GetNodeID()) + resp, err := node.dataCoord.GetImportProgress(ctx, req) + if resp.GetStatus().GetCode() != 0 || err != nil { + log.Warn("get import progress failed", zap.String("reason", resp.GetStatus().GetReason()), zap.Error(err)) + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.FailLabel).Inc() + } else { + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.SuccessLabel).Inc() + } + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.TotalLabel).Inc() + metrics.ProxyReqLatency.WithLabelValues(nodeID, method).Observe(float64(tr.ElapseSpan().Milliseconds())) + return resp, err } func (node *Proxy) ListImports(ctx context.Context, req *internalpb.ListImportsRequest) (*internalpb.ListImportsResponse, error) { @@ -5649,12 +5690,39 @@ func (node *Proxy) ListImports(ctx context.Context, req *internalpb.ListImportsR resp := &internalpb.ListImportsResponse{ Status: merr.Success(), } - collectionID, err := globalMetaCache.GetCollectionID(ctx, req.GetDbName(), req.GetCollectionName()) - if err != nil { - resp.Status = merr.Status(err) - return resp, nil + + log := log.Ctx(ctx).With( + zap.String("dbName", req.GetDbName()), + zap.String("collectionName", req.GetCollectionName()), + ) + method := "ListImports" + tr := timerecord.NewTimeRecorder(method) + log.Info(rpcReceived(method)) + + nodeID := fmt.Sprint(paramtable.GetNodeID()) + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.TotalLabel).Inc() + + var ( + err error + collectionID UniqueID + ) + if req.GetCollectionName() != "" { + collectionID, err = globalMetaCache.GetCollectionID(ctx, req.GetDbName(), req.GetCollectionName()) + if err != nil { + resp.Status = merr.Status(err) + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.FailLabel).Inc() + return resp, nil + } } - return node.dataCoord.ListImports(ctx, &internalpb.ListImportsRequestInternal{ + resp, err = node.dataCoord.ListImports(ctx, &internalpb.ListImportsRequestInternal{ CollectionID: collectionID, }) + if resp.GetStatus().GetCode() != 0 || err != nil { + log.Warn("list imports", zap.String("reason", resp.GetStatus().GetReason()), zap.Error(err)) + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.FailLabel).Inc() + } else { + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.SuccessLabel).Inc() + } + metrics.ProxyReqLatency.WithLabelValues(nodeID, method).Observe(float64(tr.ElapseSpan().Milliseconds())) + return resp, nil } diff --git a/internal/proxy/impl_test.go b/internal/proxy/impl_test.go index 1228cf1d98..18eb04d9d9 100644 --- a/internal/proxy/impl_test.go +++ b/internal/proxy/impl_test.go @@ -1615,7 +1615,9 @@ func TestProxy_ImportV2(t *testing.T) { dataCoord := mocks.NewMockDataCoordClient(t) dataCoord.EXPECT().ListImports(mock.Anything, mock.Anything).Return(nil, nil) node.dataCoord = dataCoord - rsp, err = node.ListImports(ctx, &internalpb.ListImportsRequest{}) + rsp, err = node.ListImports(ctx, &internalpb.ListImportsRequest{ + CollectionName: "col", + }) assert.NoError(t, err) assert.Equal(t, int32(0), rsp.GetStatus().GetCode()) }) diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index 783aaed1fe..30df1c3af0 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -60,7 +60,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/milvus-io/milvus/internal/util/componentutil" "github.com/milvus-io/milvus/internal/util/dependency" - "github.com/milvus-io/milvus/internal/util/importutil" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" @@ -1956,32 +1955,6 @@ func TestProxy(t *testing.T) { time.Sleep(2 * time.Second) }) - wg.Add(1) - t.Run("test import collection ID not found", func(t *testing.T) { - defer wg.Done() - req := &milvuspb.ImportRequest{ - CollectionName: "bad_collection_name", - Files: []string{"f1.json"}, - } - proxy.UpdateStateCode(commonpb.StateCode_Healthy) - resp, err := proxy.Import(context.TODO(), req) - assert.NoError(t, err) - assert.EqualValues(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) - }) - - wg.Add(1) - t.Run("test import get vChannel fail", func(t *testing.T) { - defer wg.Done() - req := &milvuspb.ImportRequest{ - CollectionName: "bad_collection_name", - Files: []string{"f1.json"}, - } - proxy.UpdateStateCode(commonpb.StateCode_Healthy) - resp, err := proxy.Import(context.TODO(), req) - assert.NoError(t, err) - assert.EqualValues(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) - }) - wg.Add(1) t.Run("release collection", func(t *testing.T) { defer wg.Done() @@ -4494,135 +4467,102 @@ func TestProxy_GetComponentStates(t *testing.T) { } func TestProxy_Import(t *testing.T) { - var wg sync.WaitGroup + cache := globalMetaCache + defer func() { globalMetaCache = cache }() - wg.Add(1) - t.Run("test import with unhealthy", func(t *testing.T) { - defer wg.Done() - req := &milvuspb.ImportRequest{ - CollectionName: "dummy", - } + t.Run("Import failed", func(t *testing.T) { proxy := &Proxy{} proxy.UpdateStateCode(commonpb.StateCode_Abnormal) + + req := &milvuspb.ImportRequest{} resp, err := proxy.Import(context.TODO(), req) assert.NoError(t, err) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) }) - wg.Add(1) - t.Run("rootcoord fail", func(t *testing.T) { - defer wg.Done() + t.Run("Import", func(t *testing.T) { proxy := &Proxy{} proxy.UpdateStateCode(commonpb.StateCode_Healthy) + + mc := NewMockCache(t) + mc.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil) + mc.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(&schemaInfo{ + CollectionSchema: &schemapb.CollectionSchema{}, + }, nil) + mc.EXPECT().GetPartitionID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(0, nil) + globalMetaCache = mc + chMgr := NewMockChannelsMgr(t) + chMgr.EXPECT().getVChannels(mock.Anything).Return(nil, nil) proxy.chMgr = chMgr - rc := newMockRootCoord() - rc.ImportFunc = func(ctx context.Context, req *milvuspb.ImportRequest, opts ...grpc.CallOption) (*milvuspb.ImportResponse, error) { - return nil, errors.New("mock") - } - proxy.rootCoord = rc + + dataCoord := mocks.NewMockDataCoordClient(t) + dataCoord.EXPECT().ImportV2(mock.Anything, mock.Anything).Return(&internalpb.ImportResponse{ + Status: merr.Success(), + JobID: "100", + }, nil) + proxy.dataCoord = dataCoord + req := &milvuspb.ImportRequest{ CollectionName: "dummy", + Files: []string{"a.json"}, } resp, err := proxy.Import(context.TODO(), req) assert.NoError(t, err) - assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) + assert.Equal(t, int32(0), resp.GetStatus().GetCode()) }) - wg.Add(1) - t.Run("normal case", func(t *testing.T) { - defer wg.Done() + t.Run("GetImportState failed", func(t *testing.T) { proxy := &Proxy{} - proxy.UpdateStateCode(commonpb.StateCode_Healthy) - chMgr := NewMockChannelsMgr(t) - proxy.chMgr = chMgr - rc := newMockRootCoord() - rc.ImportFunc = func(ctx context.Context, req *milvuspb.ImportRequest, opts ...grpc.CallOption) (*milvuspb.ImportResponse, error) { - return &milvuspb.ImportResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}}, nil - } - proxy.rootCoord = rc - req := &milvuspb.ImportRequest{ - CollectionName: "dummy", - } - resp, err := proxy.Import(context.TODO(), req) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) - }) - - wg.Add(1) - t.Run("illegal import options", func(t *testing.T) { - defer wg.Done() - proxy := &Proxy{} - proxy.UpdateStateCode(commonpb.StateCode_Healthy) - chMgr := NewMockChannelsMgr(t) - proxy.chMgr = chMgr - rc := newMockRootCoord() - rc.ImportFunc = func(ctx context.Context, req *milvuspb.ImportRequest, opts ...grpc.CallOption) (*milvuspb.ImportResponse, error) { - return &milvuspb.ImportResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}}, nil - } - proxy.rootCoord = rc - req := &milvuspb.ImportRequest{ - CollectionName: "dummy", - Options: []*commonpb.KeyValuePair{ - { - Key: importutil.StartTs, - Value: "0", - }, - { - Key: importutil.EndTs, - Value: "not a number", - }, - }, - } - resp, err := proxy.Import(context.TODO(), req) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) - }) - wg.Wait() -} - -func TestProxy_GetImportState(t *testing.T) { - req := &milvuspb.GetImportStateRequest{ - Task: 1, - } - rootCoord := &RootCoordMock{} - rootCoord.state.Store(commonpb.StateCode_Healthy) - t.Run("test get import state", func(t *testing.T) { - proxy := &Proxy{rootCoord: rootCoord} - proxy.UpdateStateCode(commonpb.StateCode_Healthy) - - resp, err := proxy.GetImportState(context.TODO(), req) - assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) - assert.NoError(t, err) - }) - t.Run("test get import state with unhealthy", func(t *testing.T) { - proxy := &Proxy{rootCoord: rootCoord} proxy.UpdateStateCode(commonpb.StateCode_Abnormal) + + req := &milvuspb.GetImportStateRequest{} resp, err := proxy.GetImportState(context.TODO(), req) + assert.NoError(t, err) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) - assert.NoError(t, err) }) -} -func TestProxy_ListImportTasks(t *testing.T) { - req := &milvuspb.ListImportTasksRequest{} - rootCoord := &RootCoordMock{} - rootCoord.state.Store(commonpb.StateCode_Healthy) - t.Run("test list import tasks", func(t *testing.T) { - proxy := &Proxy{rootCoord: rootCoord} + t.Run("GetImportState", func(t *testing.T) { + proxy := &Proxy{} proxy.UpdateStateCode(commonpb.StateCode_Healthy) - resp, err := proxy.ListImportTasks(context.TODO(), req) - assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) + dataCoord := mocks.NewMockDataCoordClient(t) + dataCoord.EXPECT().GetImportProgress(mock.Anything, mock.Anything).Return(&internalpb.GetImportProgressResponse{ + Status: merr.Success(), + }, nil) + proxy.dataCoord = dataCoord + + req := &milvuspb.GetImportStateRequest{} + resp, err := proxy.GetImportState(context.TODO(), req) assert.NoError(t, err) + assert.Equal(t, int32(0), resp.GetStatus().GetCode()) }) - t.Run("test list import tasks with unhealthy", func(t *testing.T) { - proxy := &Proxy{rootCoord: rootCoord} + + t.Run("ListImportTasks failed", func(t *testing.T) { + proxy := &Proxy{} proxy.UpdateStateCode(commonpb.StateCode_Abnormal) + + req := &milvuspb.ListImportTasksRequest{} resp, err := proxy.ListImportTasks(context.TODO(), req) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) assert.NoError(t, err) }) + + t.Run("ListImportTasks", func(t *testing.T) { + proxy := &Proxy{} + proxy.UpdateStateCode(commonpb.StateCode_Healthy) + + dataCoord := mocks.NewMockDataCoordClient(t) + dataCoord.EXPECT().ListImports(mock.Anything, mock.Anything).Return(&internalpb.ListImportsResponse{ + Status: merr.Success(), + }, nil) + proxy.dataCoord = dataCoord + + req := &milvuspb.ListImportTasksRequest{} + resp, err := proxy.ListImportTasks(context.TODO(), req) + assert.NoError(t, err) + assert.Equal(t, int32(0), resp.GetStatus().GetCode()) + }) } func TestProxy_RelatedPrivilege(t *testing.T) { diff --git a/tests/python_client/testcases/test_bulk_insert.py b/tests/python_client/testcases/test_bulk_insert.py index c4c3601bdd..1122f0f6fb 100644 --- a/tests/python_client/testcases/test_bulk_insert.py +++ b/tests/python_client/testcases/test_bulk_insert.py @@ -1220,9 +1220,9 @@ class TestBulkInsert(TestcaseBaseBulkInsert): # import data error = {} if file_nums == 0: - error = {ct.err_code: 1100, ct.err_msg: "import request is empty: invalid parameter"} + error = {ct.err_code: 1100, ct.err_msg: "import request is empty"} if file_nums > 1: - error = {ct.err_code: 65535, ct.err_msg: "for JSON or parquet file, each task only accepts one file"} + error = {ct.err_code: 65535, ct.err_msg: "for Parquet import, accepts only one file"} self.utility_wrap.do_bulk_insert( collection_name=c_name, files=files, check_task=CheckTasks.err_res, check_items=error