mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
enhance: Support db for bulkinsert (#37012)
issue: https://github.com/milvus-io/milvus/issues/31273 --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
22b917a1e6
commit
6e90f9e8d9
@ -39,6 +39,12 @@ func WithCollectionID(collectionID int64) ImportJobFilter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithDbID(DbID int64) ImportJobFilter {
|
||||||
|
return func(job ImportJob) bool {
|
||||||
|
return job.GetDbID() == DbID
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func WithJobStates(states ...internalpb.ImportJobState) ImportJobFilter {
|
func WithJobStates(states ...internalpb.ImportJobState) ImportJobFilter {
|
||||||
return func(job ImportJob) bool {
|
return func(job ImportJob) bool {
|
||||||
for _, state := range states {
|
for _, state := range states {
|
||||||
@ -100,6 +106,7 @@ func UpdateJobCompleteTime(completeTime string) UpdateJobAction {
|
|||||||
|
|
||||||
type ImportJob interface {
|
type ImportJob interface {
|
||||||
GetJobID() int64
|
GetJobID() int64
|
||||||
|
GetDbID() int64
|
||||||
GetCollectionID() int64
|
GetCollectionID() int64
|
||||||
GetCollectionName() string
|
GetCollectionName() string
|
||||||
GetPartitionIDs() []int64
|
GetPartitionIDs() []int64
|
||||||
|
|||||||
@ -1676,7 +1676,9 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
|
|||||||
Status: merr.Success(),
|
Status: merr.Success(),
|
||||||
}
|
}
|
||||||
|
|
||||||
log := log.With(zap.Int64("collection", in.GetCollectionID()),
|
log := log.With(
|
||||||
|
zap.Int64("dbID", in.GetDbID()),
|
||||||
|
zap.Int64("collection", in.GetCollectionID()),
|
||||||
zap.Int64s("partitions", in.GetPartitionIDs()),
|
zap.Int64s("partitions", in.GetPartitionIDs()),
|
||||||
zap.Strings("channels", in.GetChannelNames()))
|
zap.Strings("channels", in.GetChannelNames()))
|
||||||
log.Info("receive import request", zap.Any("files", in.GetFiles()))
|
log.Info("receive import request", zap.Any("files", in.GetFiles()))
|
||||||
@ -1742,6 +1744,7 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
|
|||||||
job := &importJob{
|
job := &importJob{
|
||||||
ImportJob: &datapb.ImportJob{
|
ImportJob: &datapb.ImportJob{
|
||||||
JobID: idStart,
|
JobID: idStart,
|
||||||
|
DbID: in.GetDbID(),
|
||||||
CollectionID: in.GetCollectionID(),
|
CollectionID: in.GetCollectionID(),
|
||||||
CollectionName: in.GetCollectionName(),
|
CollectionName: in.GetCollectionName(),
|
||||||
PartitionIDs: in.GetPartitionIDs(),
|
PartitionIDs: in.GetPartitionIDs(),
|
||||||
@ -1768,7 +1771,7 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) GetImportProgress(ctx context.Context, in *internalpb.GetImportProgressRequest) (*internalpb.GetImportProgressResponse, error) {
|
func (s *Server) GetImportProgress(ctx context.Context, in *internalpb.GetImportProgressRequest) (*internalpb.GetImportProgressResponse, error) {
|
||||||
log := log.With(zap.String("jobID", in.GetJobID()))
|
log := log.With(zap.String("jobID", in.GetJobID()), zap.Int64("dbID", in.GetDbID()))
|
||||||
if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
|
if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
|
||||||
return &internalpb.GetImportProgressResponse{
|
return &internalpb.GetImportProgressResponse{
|
||||||
Status: merr.Status(err),
|
Status: merr.Status(err),
|
||||||
@ -1788,6 +1791,10 @@ func (s *Server) GetImportProgress(ctx context.Context, in *internalpb.GetImport
|
|||||||
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("import job does not exist, jobID=%d", jobID)))
|
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("import job does not exist, jobID=%d", jobID)))
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
if job.GetDbID() != 0 && job.GetDbID() != in.GetDbID() {
|
||||||
|
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("import job does not exist, jobID=%d, dbID=%d", jobID, in.GetDbID())))
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
progress, state, importedRows, totalRows, reason := GetJobProgress(jobID, s.importMeta, s.meta, s.jobManager)
|
progress, state, importedRows, totalRows, reason := GetJobProgress(jobID, s.importMeta, s.meta, s.jobManager)
|
||||||
resp.State = state
|
resp.State = state
|
||||||
resp.Reason = reason
|
resp.Reason = reason
|
||||||
@ -1818,11 +1825,14 @@ func (s *Server) ListImports(ctx context.Context, req *internalpb.ListImportsReq
|
|||||||
}
|
}
|
||||||
|
|
||||||
var jobs []ImportJob
|
var jobs []ImportJob
|
||||||
if req.GetCollectionID() != 0 {
|
filters := make([]ImportJobFilter, 0)
|
||||||
jobs = s.importMeta.GetJobBy(WithCollectionID(req.GetCollectionID()))
|
if req.GetDbID() != 0 {
|
||||||
} else {
|
filters = append(filters, WithDbID(req.GetDbID()))
|
||||||
jobs = s.importMeta.GetJobBy()
|
|
||||||
}
|
}
|
||||||
|
if req.GetCollectionID() != 0 {
|
||||||
|
filters = append(filters, WithCollectionID(req.GetCollectionID()))
|
||||||
|
}
|
||||||
|
jobs = s.importMeta.GetJobBy(filters...)
|
||||||
|
|
||||||
for _, job := range jobs {
|
for _, job := range jobs {
|
||||||
progress, state, _, _, reason := GetJobProgress(job.GetJobID(), s.importMeta, s.meta, s.jobManager)
|
progress, state, _, _, reason := GetJobProgress(job.GetJobID(), s.importMeta, s.meta, s.jobManager)
|
||||||
@ -1832,5 +1842,7 @@ func (s *Server) ListImports(ctx context.Context, req *internalpb.ListImportsReq
|
|||||||
resp.Progresses = append(resp.Progresses, progress)
|
resp.Progresses = append(resp.Progresses, progress)
|
||||||
resp.CollectionNames = append(resp.CollectionNames, job.GetCollectionName())
|
resp.CollectionNames = append(resp.CollectionNames, job.GetCollectionName())
|
||||||
}
|
}
|
||||||
|
log.Info("ListImports done", zap.Int64("collectionID", req.GetCollectionID()),
|
||||||
|
zap.Int64("dbID", req.GetDbID()), zap.Any("resp", resp))
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1412,9 +1412,10 @@ func TestImportV2(t *testing.T) {
|
|||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.True(t, errors.Is(merr.Error(resp.GetStatus()), merr.ErrImportFailed))
|
assert.True(t, errors.Is(merr.Error(resp.GetStatus()), merr.ErrImportFailed))
|
||||||
|
|
||||||
// normal case
|
// db does not exist
|
||||||
var job ImportJob = &importJob{
|
var job ImportJob = &importJob{
|
||||||
ImportJob: &datapb.ImportJob{
|
ImportJob: &datapb.ImportJob{
|
||||||
|
DbID: 1,
|
||||||
JobID: 0,
|
JobID: 0,
|
||||||
Schema: &schemapb.CollectionSchema{},
|
Schema: &schemapb.CollectionSchema{},
|
||||||
State: internalpb.ImportJobState_Failed,
|
State: internalpb.ImportJobState_Failed,
|
||||||
@ -1423,12 +1424,31 @@ func TestImportV2(t *testing.T) {
|
|||||||
err = s.importMeta.AddJob(job)
|
err = s.importMeta.AddJob(job)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
resp, err = s.GetImportProgress(ctx, &internalpb.GetImportProgressRequest{
|
resp, err = s.GetImportProgress(ctx, &internalpb.GetImportProgressRequest{
|
||||||
|
DbID: 2,
|
||||||
|
JobID: "0",
|
||||||
|
})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.True(t, errors.Is(merr.Error(resp.GetStatus()), merr.ErrImportFailed))
|
||||||
|
|
||||||
|
// normal case
|
||||||
|
job = &importJob{
|
||||||
|
ImportJob: &datapb.ImportJob{
|
||||||
|
DbID: 1,
|
||||||
|
JobID: 0,
|
||||||
|
Schema: &schemapb.CollectionSchema{},
|
||||||
|
State: internalpb.ImportJobState_Pending,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err = s.importMeta.AddJob(job)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
resp, err = s.GetImportProgress(ctx, &internalpb.GetImportProgressRequest{
|
||||||
|
DbID: 1,
|
||||||
JobID: "0",
|
JobID: "0",
|
||||||
})
|
})
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, int32(0), resp.GetStatus().GetCode())
|
assert.Equal(t, int32(0), resp.GetStatus().GetCode())
|
||||||
assert.Equal(t, int64(0), resp.GetProgress())
|
assert.Equal(t, int64(10), resp.GetProgress())
|
||||||
assert.Equal(t, internalpb.ImportJobState_Failed, resp.GetState())
|
assert.Equal(t, internalpb.ImportJobState_Pending, resp.GetState())
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("ListImports", func(t *testing.T) {
|
t.Run("ListImports", func(t *testing.T) {
|
||||||
@ -1451,6 +1471,7 @@ func TestImportV2(t *testing.T) {
|
|||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
var job ImportJob = &importJob{
|
var job ImportJob = &importJob{
|
||||||
ImportJob: &datapb.ImportJob{
|
ImportJob: &datapb.ImportJob{
|
||||||
|
DbID: 2,
|
||||||
JobID: 0,
|
JobID: 0,
|
||||||
CollectionID: 1,
|
CollectionID: 1,
|
||||||
Schema: &schemapb.CollectionSchema{},
|
Schema: &schemapb.CollectionSchema{},
|
||||||
@ -1467,7 +1488,20 @@ func TestImportV2(t *testing.T) {
|
|||||||
}
|
}
|
||||||
err = s.importMeta.AddTask(task)
|
err = s.importMeta.AddTask(task)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
// db id not match
|
||||||
resp, err = s.ListImports(ctx, &internalpb.ListImportsRequestInternal{
|
resp, err = s.ListImports(ctx, &internalpb.ListImportsRequestInternal{
|
||||||
|
DbID: 3,
|
||||||
|
CollectionID: 1,
|
||||||
|
})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, int32(0), resp.GetStatus().GetCode())
|
||||||
|
assert.Equal(t, 0, len(resp.GetJobIDs()))
|
||||||
|
assert.Equal(t, 0, len(resp.GetStates()))
|
||||||
|
assert.Equal(t, 0, len(resp.GetReasons()))
|
||||||
|
assert.Equal(t, 0, len(resp.GetProgresses()))
|
||||||
|
// db id match
|
||||||
|
resp, err = s.ListImports(ctx, &internalpb.ListImportsRequestInternal{
|
||||||
|
DbID: 2,
|
||||||
CollectionID: 1,
|
CollectionID: 1,
|
||||||
})
|
})
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|||||||
@ -139,8 +139,8 @@ func (h *HandlersV2) RegisterRoutesToV2(router gin.IRouter) {
|
|||||||
|
|
||||||
router.POST(ImportJobCategory+ListAction, timeoutMiddleware(wrapperPost(func() any { return &OptionalCollectionNameReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.listImportJob)))))
|
router.POST(ImportJobCategory+ListAction, timeoutMiddleware(wrapperPost(func() any { return &OptionalCollectionNameReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.listImportJob)))))
|
||||||
router.POST(ImportJobCategory+CreateAction, timeoutMiddleware(wrapperPost(func() any { return &ImportReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.createImportJob)))))
|
router.POST(ImportJobCategory+CreateAction, timeoutMiddleware(wrapperPost(func() any { return &ImportReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.createImportJob)))))
|
||||||
router.POST(ImportJobCategory+GetProgressAction, timeoutMiddleware(wrapperPost(func() any { return &JobIDReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.getImportJobProcess)))))
|
router.POST(ImportJobCategory+GetProgressAction, timeoutMiddleware(wrapperPost(func() any { return &GetImportReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.getImportJobProcess)))))
|
||||||
router.POST(ImportJobCategory+DescribeAction, timeoutMiddleware(wrapperPost(func() any { return &JobIDReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.getImportJobProcess)))))
|
router.POST(ImportJobCategory+DescribeAction, timeoutMiddleware(wrapperPost(func() any { return &GetImportReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.getImportJobProcess)))))
|
||||||
}
|
}
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
|||||||
@ -94,11 +94,14 @@ func (req *ImportReq) GetOptions() map[string]string {
|
|||||||
return req.Options
|
return req.Options
|
||||||
}
|
}
|
||||||
|
|
||||||
type JobIDReq struct {
|
type GetImportReq struct {
|
||||||
JobID string `json:"jobId" binding:"required"`
|
DbName string `json:"dbName"`
|
||||||
|
JobID string `json:"jobId" binding:"required"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (req *JobIDReq) GetJobID() string { return req.JobID }
|
func (req *GetImportReq) GetJobID() string { return req.JobID }
|
||||||
|
|
||||||
|
func (req *GetImportReq) GetDbName() string { return req.DbName }
|
||||||
|
|
||||||
type QueryReqV2 struct {
|
type QueryReqV2 struct {
|
||||||
DbName string `json:"dbName"`
|
DbName string `json:"dbName"`
|
||||||
|
|||||||
@ -345,6 +345,7 @@ message ImportResponse {
|
|||||||
message GetImportProgressRequest {
|
message GetImportProgressRequest {
|
||||||
string db_name = 1;
|
string db_name = 1;
|
||||||
string jobID = 2;
|
string jobID = 2;
|
||||||
|
int64 dbID = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
message ImportTaskProgress {
|
message ImportTaskProgress {
|
||||||
|
|||||||
@ -6242,6 +6242,7 @@ func (node *Proxy) ImportV2(ctx context.Context, req *internalpb.ImportRequest)
|
|||||||
return &internalpb.ImportResponse{Status: merr.Status(err)}, nil
|
return &internalpb.ImportResponse{Status: merr.Status(err)}, nil
|
||||||
}
|
}
|
||||||
log := log.Ctx(ctx).With(
|
log := log.Ctx(ctx).With(
|
||||||
|
zap.String("dbName", req.GetDbName()),
|
||||||
zap.String("collectionName", req.GetCollectionName()),
|
zap.String("collectionName", req.GetCollectionName()),
|
||||||
zap.String("partition name", req.GetPartitionName()),
|
zap.String("partition name", req.GetPartitionName()),
|
||||||
zap.Any("files", req.GetFiles()),
|
zap.Any("files", req.GetFiles()),
|
||||||
@ -6267,6 +6268,11 @@ func (node *Proxy) ImportV2(ctx context.Context, req *internalpb.ImportRequest)
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
dbInfo, err := globalMetaCache.GetDatabaseInfo(ctx, req.GetDbName())
|
||||||
|
if err != nil {
|
||||||
|
resp.Status = merr.Status(err)
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
collectionID, err := globalMetaCache.GetCollectionID(ctx, req.GetDbName(), req.GetCollectionName())
|
collectionID, err := globalMetaCache.GetCollectionID(ctx, req.GetDbName(), req.GetCollectionName())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
resp.Status = merr.Status(err)
|
resp.Status = merr.Status(err)
|
||||||
@ -6377,6 +6383,7 @@ func (node *Proxy) ImportV2(ctx context.Context, req *internalpb.ImportRequest)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
importRequest := &internalpb.ImportRequestInternal{
|
importRequest := &internalpb.ImportRequestInternal{
|
||||||
|
DbID: dbInfo.dbID,
|
||||||
CollectionID: collectionID,
|
CollectionID: collectionID,
|
||||||
CollectionName: req.GetCollectionName(),
|
CollectionName: req.GetCollectionName(),
|
||||||
PartitionIDs: partitionIDs,
|
PartitionIDs: partitionIDs,
|
||||||
@ -6401,14 +6408,28 @@ func (node *Proxy) GetImportProgress(ctx context.Context, req *internalpb.GetImp
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
log := log.Ctx(ctx).With(
|
log := log.Ctx(ctx).With(
|
||||||
|
zap.String("dbName", req.GetDbName()),
|
||||||
zap.String("jobID", req.GetJobID()),
|
zap.String("jobID", req.GetJobID()),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
resp := &internalpb.GetImportProgressResponse{
|
||||||
|
Status: merr.Success(),
|
||||||
|
}
|
||||||
|
|
||||||
method := "GetImportProgress"
|
method := "GetImportProgress"
|
||||||
tr := timerecord.NewTimeRecorder(method)
|
tr := timerecord.NewTimeRecorder(method)
|
||||||
log.Info(rpcReceived(method))
|
log.Info(rpcReceived(method))
|
||||||
|
|
||||||
|
// Fill db id for datacoord.
|
||||||
|
dbInfo, err := globalMetaCache.GetDatabaseInfo(ctx, req.GetDbName())
|
||||||
|
if err != nil {
|
||||||
|
resp.Status = merr.Status(err)
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
req.DbID = dbInfo.dbID
|
||||||
|
|
||||||
nodeID := fmt.Sprint(paramtable.GetNodeID())
|
nodeID := fmt.Sprint(paramtable.GetNodeID())
|
||||||
resp, err := node.dataCoord.GetImportProgress(ctx, req)
|
resp, err = node.dataCoord.GetImportProgress(ctx, req)
|
||||||
if resp.GetStatus().GetCode() != 0 || err != nil {
|
if resp.GetStatus().GetCode() != 0 || err != nil {
|
||||||
log.Warn("get import progress failed", zap.String("reason", resp.GetStatus().GetReason()), zap.Error(err))
|
log.Warn("get import progress failed", zap.String("reason", resp.GetStatus().GetReason()), zap.Error(err))
|
||||||
metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.FailLabel, req.GetDbName(), "").Inc()
|
metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.FailLabel, req.GetDbName(), "").Inc()
|
||||||
@ -6445,6 +6466,11 @@ func (node *Proxy) ListImports(ctx context.Context, req *internalpb.ListImportsR
|
|||||||
err error
|
err error
|
||||||
collectionID UniqueID
|
collectionID UniqueID
|
||||||
)
|
)
|
||||||
|
dbInfo, err := globalMetaCache.GetDatabaseInfo(ctx, req.GetDbName())
|
||||||
|
if err != nil {
|
||||||
|
resp.Status = merr.Status(err)
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
if req.GetCollectionName() != "" {
|
if req.GetCollectionName() != "" {
|
||||||
collectionID, err = globalMetaCache.GetCollectionID(ctx, req.GetDbName(), req.GetCollectionName())
|
collectionID, err = globalMetaCache.GetCollectionID(ctx, req.GetDbName(), req.GetCollectionName())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -6453,7 +6479,9 @@ func (node *Proxy) ListImports(ctx context.Context, req *internalpb.ListImportsR
|
|||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err = node.dataCoord.ListImports(ctx, &internalpb.ListImportsRequestInternal{
|
resp, err = node.dataCoord.ListImports(ctx, &internalpb.ListImportsRequestInternal{
|
||||||
|
DbID: dbInfo.dbID,
|
||||||
CollectionID: collectionID,
|
CollectionID: collectionID,
|
||||||
})
|
})
|
||||||
if resp.GetStatus().GetCode() != 0 || err != nil {
|
if resp.GetStatus().GetCode() != 0 || err != nil {
|
||||||
|
|||||||
@ -1616,8 +1616,17 @@ func TestProxy_ImportV2(t *testing.T) {
|
|||||||
assert.NotEqual(t, int32(0), rsp.GetStatus().GetCode())
|
assert.NotEqual(t, int32(0), rsp.GetStatus().GetCode())
|
||||||
node.UpdateStateCode(commonpb.StateCode_Healthy)
|
node.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||||
|
|
||||||
// no such collection
|
// no such database
|
||||||
mc := NewMockCache(t)
|
mc := NewMockCache(t)
|
||||||
|
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(nil, mockErr)
|
||||||
|
globalMetaCache = mc
|
||||||
|
rsp, err = node.ImportV2(ctx, &internalpb.ImportRequest{CollectionName: "aaa"})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.NotEqual(t, int32(0), rsp.GetStatus().GetCode())
|
||||||
|
|
||||||
|
// no such collection
|
||||||
|
mc = NewMockCache(t)
|
||||||
|
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 1}, nil)
|
||||||
mc.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, mockErr)
|
mc.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, mockErr)
|
||||||
globalMetaCache = mc
|
globalMetaCache = mc
|
||||||
rsp, err = node.ImportV2(ctx, &internalpb.ImportRequest{CollectionName: "aaa"})
|
rsp, err = node.ImportV2(ctx, &internalpb.ImportRequest{CollectionName: "aaa"})
|
||||||
@ -1626,6 +1635,7 @@ func TestProxy_ImportV2(t *testing.T) {
|
|||||||
|
|
||||||
// get schema failed
|
// get schema failed
|
||||||
mc = NewMockCache(t)
|
mc = NewMockCache(t)
|
||||||
|
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 1}, nil)
|
||||||
mc.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil)
|
mc.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil)
|
||||||
mc.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(nil, mockErr)
|
mc.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(nil, mockErr)
|
||||||
globalMetaCache = mc
|
globalMetaCache = mc
|
||||||
@ -1635,6 +1645,7 @@ func TestProxy_ImportV2(t *testing.T) {
|
|||||||
|
|
||||||
// get channel failed
|
// get channel failed
|
||||||
mc = NewMockCache(t)
|
mc = NewMockCache(t)
|
||||||
|
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 1}, nil)
|
||||||
mc.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil)
|
mc.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil)
|
||||||
mc.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(&schemaInfo{
|
mc.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(&schemaInfo{
|
||||||
CollectionSchema: &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{
|
CollectionSchema: &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{
|
||||||
@ -1659,6 +1670,7 @@ func TestProxy_ImportV2(t *testing.T) {
|
|||||||
|
|
||||||
// get partitions failed
|
// get partitions failed
|
||||||
mc = NewMockCache(t)
|
mc = NewMockCache(t)
|
||||||
|
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 1}, nil)
|
||||||
mc.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil)
|
mc.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil)
|
||||||
mc.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(&schemaInfo{
|
mc.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(&schemaInfo{
|
||||||
CollectionSchema: &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{
|
CollectionSchema: &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{
|
||||||
@ -1673,6 +1685,7 @@ func TestProxy_ImportV2(t *testing.T) {
|
|||||||
|
|
||||||
// get partitionID failed
|
// get partitionID failed
|
||||||
mc = NewMockCache(t)
|
mc = NewMockCache(t)
|
||||||
|
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 1}, nil)
|
||||||
mc.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil)
|
mc.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil)
|
||||||
mc.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(&schemaInfo{
|
mc.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(&schemaInfo{
|
||||||
CollectionSchema: &schemapb.CollectionSchema{},
|
CollectionSchema: &schemapb.CollectionSchema{},
|
||||||
@ -1685,6 +1698,7 @@ func TestProxy_ImportV2(t *testing.T) {
|
|||||||
|
|
||||||
// no file
|
// no file
|
||||||
mc = NewMockCache(t)
|
mc = NewMockCache(t)
|
||||||
|
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 1}, nil)
|
||||||
mc.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil)
|
mc.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil)
|
||||||
mc.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(&schemaInfo{
|
mc.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(&schemaInfo{
|
||||||
CollectionSchema: &schemapb.CollectionSchema{},
|
CollectionSchema: &schemapb.CollectionSchema{},
|
||||||
@ -1731,7 +1745,18 @@ func TestProxy_ImportV2(t *testing.T) {
|
|||||||
assert.NotEqual(t, int32(0), rsp.GetStatus().GetCode())
|
assert.NotEqual(t, int32(0), rsp.GetStatus().GetCode())
|
||||||
node.UpdateStateCode(commonpb.StateCode_Healthy)
|
node.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||||
|
|
||||||
|
// no such database
|
||||||
|
mc := NewMockCache(t)
|
||||||
|
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(nil, mockErr)
|
||||||
|
globalMetaCache = mc
|
||||||
|
rsp, err = node.GetImportProgress(ctx, &internalpb.GetImportProgressRequest{})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.NotEqual(t, int32(0), rsp.GetStatus().GetCode())
|
||||||
|
|
||||||
// normal case
|
// normal case
|
||||||
|
mc = NewMockCache(t)
|
||||||
|
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 1}, nil)
|
||||||
|
globalMetaCache = mc
|
||||||
dataCoord := mocks.NewMockDataCoordClient(t)
|
dataCoord := mocks.NewMockDataCoordClient(t)
|
||||||
dataCoord.EXPECT().GetImportProgress(mock.Anything, mock.Anything).Return(nil, nil)
|
dataCoord.EXPECT().GetImportProgress(mock.Anything, mock.Anything).Return(nil, nil)
|
||||||
node.dataCoord = dataCoord
|
node.dataCoord = dataCoord
|
||||||
@ -1749,8 +1774,19 @@ func TestProxy_ImportV2(t *testing.T) {
|
|||||||
assert.NotEqual(t, int32(0), rsp.GetStatus().GetCode())
|
assert.NotEqual(t, int32(0), rsp.GetStatus().GetCode())
|
||||||
node.UpdateStateCode(commonpb.StateCode_Healthy)
|
node.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||||
|
|
||||||
// normal case
|
// no such database
|
||||||
mc := NewMockCache(t)
|
mc := NewMockCache(t)
|
||||||
|
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(nil, mockErr)
|
||||||
|
globalMetaCache = mc
|
||||||
|
rsp, err = node.ListImports(ctx, &internalpb.ListImportsRequest{
|
||||||
|
CollectionName: "col",
|
||||||
|
})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.NotEqual(t, int32(0), rsp.GetStatus().GetCode())
|
||||||
|
|
||||||
|
// normal case
|
||||||
|
mc = NewMockCache(t)
|
||||||
|
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 1}, nil)
|
||||||
mc.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil)
|
mc.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil)
|
||||||
globalMetaCache = mc
|
globalMetaCache = mc
|
||||||
dataCoord := mocks.NewMockDataCoordClient(t)
|
dataCoord := mocks.NewMockDataCoordClient(t)
|
||||||
|
|||||||
@ -4570,6 +4570,7 @@ func TestProxy_Import(t *testing.T) {
|
|||||||
proxy.UpdateStateCode(commonpb.StateCode_Healthy)
|
proxy.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||||
|
|
||||||
mc := NewMockCache(t)
|
mc := NewMockCache(t)
|
||||||
|
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 1}, nil)
|
||||||
mc.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil)
|
mc.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil)
|
||||||
mc.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(&schemaInfo{
|
mc.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(&schemaInfo{
|
||||||
CollectionSchema: &schemapb.CollectionSchema{},
|
CollectionSchema: &schemapb.CollectionSchema{},
|
||||||
@ -4610,6 +4611,10 @@ func TestProxy_Import(t *testing.T) {
|
|||||||
proxy := &Proxy{}
|
proxy := &Proxy{}
|
||||||
proxy.UpdateStateCode(commonpb.StateCode_Healthy)
|
proxy.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||||
|
|
||||||
|
mc := NewMockCache(t)
|
||||||
|
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 1}, nil)
|
||||||
|
globalMetaCache = mc
|
||||||
|
|
||||||
dataCoord := mocks.NewMockDataCoordClient(t)
|
dataCoord := mocks.NewMockDataCoordClient(t)
|
||||||
dataCoord.EXPECT().GetImportProgress(mock.Anything, mock.Anything).Return(&internalpb.GetImportProgressResponse{
|
dataCoord.EXPECT().GetImportProgress(mock.Anything, mock.Anything).Return(&internalpb.GetImportProgressResponse{
|
||||||
Status: merr.Success(),
|
Status: merr.Success(),
|
||||||
@ -4635,6 +4640,10 @@ func TestProxy_Import(t *testing.T) {
|
|||||||
proxy := &Proxy{}
|
proxy := &Proxy{}
|
||||||
proxy.UpdateStateCode(commonpb.StateCode_Healthy)
|
proxy.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||||
|
|
||||||
|
mc := NewMockCache(t)
|
||||||
|
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 1}, nil)
|
||||||
|
globalMetaCache = mc
|
||||||
|
|
||||||
dataCoord := mocks.NewMockDataCoordClient(t)
|
dataCoord := mocks.NewMockDataCoordClient(t)
|
||||||
dataCoord.EXPECT().ListImports(mock.Anything, mock.Anything).Return(&internalpb.ListImportsResponse{
|
dataCoord.EXPECT().ListImports(mock.Anything, mock.Anything).Return(&internalpb.ListImportsResponse{
|
||||||
Status: merr.Success(),
|
Status: merr.Success(),
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user