Revert parallel ddl (#20118)

Signed-off-by: longjiquan <jiquan.long@zilliz.com>

Signed-off-by: longjiquan <jiquan.long@zilliz.com>
This commit is contained in:
Jiquan Long 2022-10-28 13:25:34 +08:00 committed by GitHub
parent 6224b3c44c
commit 89541a6f68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1093 additions and 423 deletions

View File

@ -0,0 +1,33 @@
package rootcoord
import (
"context"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
)
// describeCollectionTask describe collection request task
type describeCollectionTask struct {
baseTask
Req *milvuspb.DescribeCollectionRequest
Rsp *milvuspb.DescribeCollectionResponse
}
func (t *describeCollectionTask) Prepare(ctx context.Context) error {
if err := CheckMsgType(t.Req.Base.MsgType, commonpb.MsgType_DescribeCollection); err != nil {
return err
}
return nil
}
// Execute task execution
func (t *describeCollectionTask) Execute(ctx context.Context) (err error) {
coll, err := t.core.describeCollection(ctx, t.Req)
if err != nil {
return err
}
aliases := t.core.meta.ListAliasesByID(coll.CollectionID)
t.Rsp = convertModelToDesc(coll, aliases)
return nil
}

View File

@ -0,0 +1,111 @@
package rootcoord
import (
"context"
"testing"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/stretchr/testify/assert"
)
func Test_describeCollectionTask_Prepare(t *testing.T) {
t.Run("invalid msg type", func(t *testing.T) {
task := &describeCollectionTask{
Req: &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DropCollection,
},
},
}
err := task.Prepare(context.Background())
assert.Error(t, err)
})
t.Run("normal case", func(t *testing.T) {
task := &describeCollectionTask{
Req: &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeCollection,
},
},
}
err := task.Prepare(context.Background())
assert.NoError(t, err)
})
}
func Test_describeCollectionTask_Execute(t *testing.T) {
t.Run("failed to get collection by name", func(t *testing.T) {
core := newTestCore(withInvalidMeta())
task := &describeCollectionTask{
baseTask: baseTask{
core: core,
done: make(chan error, 1),
},
Req: &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeCollection,
},
CollectionName: "test coll",
},
Rsp: &milvuspb.DescribeCollectionResponse{},
}
err := task.Execute(context.Background())
assert.Error(t, err)
})
t.Run("failed to get collection by id", func(t *testing.T) {
core := newTestCore(withInvalidMeta())
task := &describeCollectionTask{
baseTask: baseTask{
core: core,
done: make(chan error, 1),
},
Req: &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeCollection,
},
CollectionID: 1,
},
Rsp: &milvuspb.DescribeCollectionResponse{},
}
err := task.Execute(context.Background())
assert.Error(t, err)
})
t.Run("success", func(t *testing.T) {
meta := newMockMetaTable()
meta.GetCollectionByIDFunc = func(ctx context.Context, collectionID UniqueID, ts Timestamp) (*model.Collection, error) {
return &model.Collection{
CollectionID: 1,
Name: "test coll",
}, nil
}
alias1, alias2 := funcutil.GenRandomStr(), funcutil.GenRandomStr()
meta.ListAliasesByIDFunc = func(collID UniqueID) []string {
return []string{alias1, alias2}
}
core := newTestCore(withMeta(meta))
task := &describeCollectionTask{
baseTask: baseTask{
core: core,
done: make(chan error, 1),
},
Req: &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeCollection,
},
CollectionID: 1,
},
Rsp: &milvuspb.DescribeCollectionResponse{},
}
err := task.Execute(context.Background())
assert.NoError(t, err)
assert.Equal(t, task.Rsp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
assert.ElementsMatch(t, []string{alias1, alias2}, task.Rsp.GetAliases())
})
}

View File

@ -60,6 +60,13 @@ func (t *dropCollectionTask) Execute(ctx context.Context) error {
redoTask := newBaseRedoTask(t.core.stepExecutor)
redoTask.AddSyncStep(&expireCacheStep{
baseStep: baseStep{core: t.core},
collectionNames: append(aliases, collMeta.Name),
collectionID: collMeta.CollectionID,
ts: ts,
opts: []expireCacheOpt{expireCacheWithDropFlag()},
})
redoTask.AddSyncStep(&changeCollectionStateStep{
baseStep: baseStep{core: t.core},
collectionID: collMeta.CollectionID,
@ -67,13 +74,6 @@ func (t *dropCollectionTask) Execute(ctx context.Context) error {
ts: ts,
})
redoTask.AddAsyncStep(&expireCacheStep{
baseStep: baseStep{core: t.core},
collectionNames: append(aliases, collMeta.Name),
collectionID: collMeta.CollectionID,
ts: ts,
opts: []expireCacheOpt{expireCacheWithDropFlag()},
})
redoTask.AddAsyncStep(&releaseCollectionStep{
baseStep: baseStep{core: t.core},
collectionID: collMeta.CollectionID,

View File

@ -95,35 +95,29 @@ func Test_dropCollectionTask_Execute(t *testing.T) {
})
t.Run("failed to expire cache", func(t *testing.T) {
//collectionName := funcutil.GenRandomStr()
//coll := &model.Collection{Name: collectionName}
//
//meta := mockrootcoord.NewIMetaTable(t)
//meta.On("GetCollectionByName",
// mock.Anything, // context.Context
// mock.AnythingOfType("string"),
// mock.AnythingOfType("uint64"),
//).Return(coll.Clone(), nil)
//meta.On("ListAliasesByID",
// mock.AnythingOfType("int64"),
//).Return([]string{})
//meta.On("ChangeCollectionState",
// mock.Anything, // context.Context
// mock.AnythingOfType("int64"),
// mock.Anything, // etcdpb.CollectionState
// mock.AnythingOfType("uint64"),
//).Return(nil)
//
//core := newTestCore(withInvalidProxyManager(), withMeta(meta))
//task := &dropCollectionTask{
// baseTask: baseTask{core: core},
// Req: &milvuspb.DropCollectionRequest{
// Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropCollection},
// CollectionName: collectionName,
// },
//}
//err := task.Execute(context.Background())
//assert.Error(t, err)
collectionName := funcutil.GenRandomStr()
coll := &model.Collection{Name: collectionName}
meta := mockrootcoord.NewIMetaTable(t)
meta.On("GetCollectionByName",
mock.Anything, // context.Context
mock.AnythingOfType("string"),
mock.AnythingOfType("uint64"),
).Return(coll.Clone(), nil)
meta.On("ListAliasesByID",
mock.AnythingOfType("int64"),
).Return([]string{})
core := newTestCore(withInvalidProxyManager(), withMeta(meta))
task := &dropCollectionTask{
baseTask: baseTask{core: core},
Req: &milvuspb.DropCollectionRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropCollection},
CollectionName: collectionName,
},
}
err := task.Execute(context.Background())
assert.Error(t, err)
})
t.Run("failed to change collection state", func(t *testing.T) {

View File

@ -54,6 +54,13 @@ func (t *dropPartitionTask) Execute(ctx context.Context) error {
}
redoTask := newBaseRedoTask(t.core.stepExecutor)
redoTask.AddSyncStep(&expireCacheStep{
baseStep: baseStep{core: t.core},
collectionNames: []string{t.collMeta.Name},
collectionID: t.collMeta.CollectionID,
ts: t.GetTs(),
})
redoTask.AddSyncStep(&changePartitionStateStep{
baseStep: baseStep{core: t.core},
collectionID: t.collMeta.CollectionID,
@ -62,13 +69,6 @@ func (t *dropPartitionTask) Execute(ctx context.Context) error {
ts: t.GetTs(),
})
redoTask.AddAsyncStep(&expireCacheStep{
baseStep: baseStep{core: t.core},
collectionNames: []string{t.collMeta.Name},
collectionID: t.collMeta.CollectionID,
ts: t.GetTs(),
})
redoTask.AddAsyncStep(&dropIndexStep{
baseStep: baseStep{core: t.core},
collID: t.collMeta.CollectionID,

View File

@ -89,29 +89,21 @@ func Test_dropPartitionTask_Execute(t *testing.T) {
})
t.Run("failed to expire cache", func(t *testing.T) {
//collectionName := funcutil.GenRandomStr()
//partitionName := funcutil.GenRandomStr()
//coll := &model.Collection{Name: collectionName, Partitions: []*model.Partition{{PartitionName: partitionName}}}
//meta := mockrootcoord.NewIMetaTable(t)
//meta.On("ChangePartitionState",
// mock.Anything, // context.Context
// mock.AnythingOfType("int64"),
// mock.AnythingOfType("int64"),
// mock.Anything, // pb.PartitionState
// mock.AnythingOfType("uint64"),
//).Return(nil)
//core := newTestCore(withInvalidProxyManager(), withMeta(meta))
//task := &dropPartitionTask{
// baseTask: baseTask{core: core},
// Req: &milvuspb.DropPartitionRequest{
// Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropPartition},
// CollectionName: collectionName,
// PartitionName: partitionName,
// },
// collMeta: coll.Clone(),
//}
//err := task.Execute(context.Background())
//assert.Error(t, err)
collectionName := funcutil.GenRandomStr()
partitionName := funcutil.GenRandomStr()
coll := &model.Collection{Name: collectionName, Partitions: []*model.Partition{{PartitionName: partitionName}}}
core := newTestCore(withInvalidProxyManager())
task := &dropPartitionTask{
baseTask: baseTask{core: core},
Req: &milvuspb.DropPartitionRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropPartition},
CollectionName: collectionName,
PartitionName: partitionName,
},
collMeta: coll.Clone(),
}
err := task.Execute(context.Background())
assert.Error(t, err)
})
t.Run("failed to change partition state", func(t *testing.T) {

View File

@ -0,0 +1,32 @@
package rootcoord
import (
"context"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
)
// hasCollectionTask has collection request task
type hasCollectionTask struct {
baseTask
Req *milvuspb.HasCollectionRequest
Rsp *milvuspb.BoolResponse
}
func (t *hasCollectionTask) Prepare(ctx context.Context) error {
if err := CheckMsgType(t.Req.Base.MsgType, commonpb.MsgType_HasCollection); err != nil {
return err
}
return nil
}
// Execute task execution
func (t *hasCollectionTask) Execute(ctx context.Context) error {
t.Rsp.Status = succStatus()
ts := getTravelTs(t.Req)
// TODO: what if err != nil && common.IsCollectionNotExistError == false, should we consider this RPC as failure?
_, err := t.core.meta.GetCollectionByName(ctx, t.Req.GetCollectionName(), ts)
t.Rsp.Value = err == nil
return nil
}

View File

@ -0,0 +1,84 @@
package rootcoord
import (
"context"
"testing"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/stretchr/testify/assert"
)
func Test_hasCollectionTask_Prepare(t *testing.T) {
t.Run("invalid msg type", func(t *testing.T) {
task := &hasCollectionTask{
Req: &milvuspb.HasCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Undefined,
},
CollectionName: "test coll",
},
}
err := task.Prepare(context.Background())
assert.Error(t, err)
})
t.Run("normal case", func(t *testing.T) {
task := &hasCollectionTask{
Req: &milvuspb.HasCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_HasCollection,
},
},
}
err := task.Prepare(context.Background())
assert.NoError(t, err)
})
}
func Test_hasCollectionTask_Execute(t *testing.T) {
t.Run("failed", func(t *testing.T) {
core := newTestCore(withInvalidMeta())
task := &hasCollectionTask{
baseTask: baseTask{
core: core,
done: make(chan error, 1),
},
Req: &milvuspb.HasCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_HasCollection,
},
},
Rsp: &milvuspb.BoolResponse{},
}
err := task.Execute(context.Background())
assert.NoError(t, err)
assert.Equal(t, task.Rsp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
assert.False(t, task.Rsp.GetValue())
})
t.Run("success", func(t *testing.T) {
meta := newMockMetaTable()
meta.GetCollectionByNameFunc = func(ctx context.Context, collectionName string, ts Timestamp) (*model.Collection, error) {
return nil, nil
}
core := newTestCore(withMeta(meta))
task := &hasCollectionTask{
baseTask: baseTask{
core: core,
done: make(chan error, 1),
},
Req: &milvuspb.HasCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_HasCollection,
},
},
Rsp: &milvuspb.BoolResponse{},
}
err := task.Execute(context.Background())
assert.NoError(t, err)
assert.Equal(t, task.Rsp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
assert.True(t, task.Rsp.GetValue())
})
}

View File

@ -0,0 +1,42 @@
package rootcoord
import (
"context"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
// hasPartitionTask has partition request task
type hasPartitionTask struct {
baseTask
Req *milvuspb.HasPartitionRequest
Rsp *milvuspb.BoolResponse
}
func (t *hasPartitionTask) Prepare(ctx context.Context) error {
if err := CheckMsgType(t.Req.Base.MsgType, commonpb.MsgType_HasPartition); err != nil {
return err
}
return nil
}
// Execute task execution
func (t *hasPartitionTask) Execute(ctx context.Context) error {
t.Rsp.Status = succStatus()
t.Rsp.Value = false
// TODO: why HasPartitionRequest doesn't contain Timestamp but other requests do.
coll, err := t.core.meta.GetCollectionByName(ctx, t.Req.CollectionName, typeutil.MaxTimestamp)
if err != nil {
t.Rsp.Status = failStatus(commonpb.ErrorCode_CollectionNotExists, err.Error())
return err
}
for _, part := range coll.Partitions {
if part.PartitionName == t.Req.PartitionName {
t.Rsp.Value = true
break
}
}
return nil
}

View File

@ -0,0 +1,127 @@
package rootcoord
import (
"context"
"testing"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/stretchr/testify/assert"
)
func Test_hasPartitionTask_Prepare(t *testing.T) {
t.Run("invalid msg type", func(t *testing.T) {
task := &hasPartitionTask{
Req: &milvuspb.HasPartitionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Undefined,
},
},
}
err := task.Prepare(context.Background())
assert.Error(t, err)
})
t.Run("normal case", func(t *testing.T) {
task := &hasPartitionTask{
Req: &milvuspb.HasPartitionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_HasPartition,
},
},
}
err := task.Prepare(context.Background())
assert.NoError(t, err)
})
}
func Test_hasPartitionTask_Execute(t *testing.T) {
t.Run("fail to get collection", func(t *testing.T) {
core := newTestCore(withInvalidMeta())
task := &hasPartitionTask{
baseTask: baseTask{
core: core,
done: make(chan error, 1),
},
Req: &milvuspb.HasPartitionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_HasPartition,
},
CollectionName: "test coll",
},
Rsp: &milvuspb.BoolResponse{},
}
err := task.Execute(context.Background())
assert.Error(t, err)
assert.Equal(t, task.Rsp.GetStatus().GetErrorCode(), commonpb.ErrorCode_CollectionNotExists)
assert.False(t, task.Rsp.GetValue())
})
t.Run("failed", func(t *testing.T) {
meta := newMockMetaTable()
meta.GetCollectionByNameFunc = func(ctx context.Context, collectionName string, ts Timestamp) (*model.Collection, error) {
return &model.Collection{
Partitions: []*model.Partition{
{
PartitionName: "invalid test partition",
},
},
}, nil
}
core := newTestCore(withMeta(meta))
task := &hasPartitionTask{
baseTask: baseTask{
core: core,
done: make(chan error, 1),
},
Req: &milvuspb.HasPartitionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_HasCollection,
},
CollectionName: "test coll",
PartitionName: "test partition",
},
Rsp: &milvuspb.BoolResponse{},
}
err := task.Execute(context.Background())
assert.NoError(t, err)
assert.Equal(t, task.Rsp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
assert.False(t, task.Rsp.GetValue())
})
t.Run("success", func(t *testing.T) {
meta := newMockMetaTable()
meta.GetCollectionByNameFunc = func(ctx context.Context, collectionName string, ts Timestamp) (*model.Collection, error) {
return &model.Collection{
Partitions: []*model.Partition{
{
PartitionName: "invalid test partition",
},
{
PartitionName: "test partition",
},
},
}, nil
}
core := newTestCore(withMeta(meta))
task := &hasPartitionTask{
baseTask: baseTask{
core: core,
done: make(chan error, 1),
},
Req: &milvuspb.HasPartitionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_HasCollection,
},
CollectionName: "test coll",
PartitionName: "test partition",
},
Rsp: &milvuspb.BoolResponse{},
}
err := task.Execute(context.Background())
assert.NoError(t, err)
assert.Equal(t, task.Rsp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
assert.True(t, task.Rsp.GetValue())
})
}

View File

@ -869,16 +869,36 @@ func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequ
log.Info("received request to has collection")
_, err := c.meta.GetCollectionByName(ctx, in.GetCollectionName(), ts)
// TODO: what if err != nil && common.IsCollectionNotExistError == false, should we consider this RPC as failure?
has := err == nil
t := &hasCollectionTask{
baseTask: newBaseTask(ctx, c),
Req: in,
Rsp: &milvuspb.BoolResponse{},
}
if err := c.scheduler.AddTask(t); err != nil {
log.Error("failed to enqueue request to has collection", zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues("HasCollection", metrics.FailLabel).Inc()
return &milvuspb.BoolResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "HasCollection failed: "+err.Error()),
Value: false,
}, nil
}
if err := t.WaitToFinish(); err != nil {
log.Error("failed to has collection", zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues("HasCollection", metrics.FailLabel).Inc()
return &milvuspb.BoolResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "HasCollection failed: "+err.Error()),
Value: false,
}, nil
}
metrics.RootCoordDDLReqCounter.WithLabelValues("HasCollection", metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues("HasCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
log.Info("done to has collection", zap.Bool("exist", has))
log.Info("done to has collection", zap.Bool("exist", t.Rsp.GetValue()))
return &milvuspb.BoolResponse{Status: succStatus(), Value: has}, nil
return t.Rsp, nil
}
func (c *Core) describeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*model.Collection, error) {
@ -935,28 +955,36 @@ func (c *Core) DescribeCollection(ctx context.Context, in *milvuspb.DescribeColl
log.Info("received request to describe collection")
coll, err := c.describeCollection(ctx, in)
if err != nil {
// TODO: check whether err indicates the collection not exist.
t := &describeCollectionTask{
baseTask: newBaseTask(ctx, c),
Req: in,
Rsp: &milvuspb.DescribeCollectionResponse{},
}
log.Error("failed to describe collection", zap.Error(err))
if err := c.scheduler.AddTask(t); err != nil {
log.Error("failed to enqueue request to describe collection", zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.FailLabel).Inc()
return &milvuspb.DescribeCollectionResponse{
// TODO: use commonpb.ErrorCode_CollectionNotExists. SDK use commonpb.ErrorCode_UnexpectedError now.
Status: failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()),
}, nil
}
aliases := c.meta.ListAliasesByID(coll.CollectionID)
desc := convertModelToDesc(coll, aliases)
if err := t.WaitToFinish(); err != nil {
log.Error("failed to describe collection", zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.FailLabel).Inc()
return &milvuspb.DescribeCollectionResponse{
// TODO: use commonpb.ErrorCode_CollectionNotExists. SDK use commonpb.ErrorCode_UnexpectedError now.
Status: failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()),
}, nil
}
metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues("DescribeCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
log.Info("done to describe collection", zap.Int64("collection_id", desc.GetCollectionID()))
log.Info("done to describe collection", zap.Int64("collection_id", t.Rsp.GetCollectionID()))
return desc, nil
return t.Rsp, nil
}
// ShowCollections list all collection names
@ -970,34 +998,39 @@ func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollections
metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollections", metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder("ShowCollections")
resp := &milvuspb.ShowCollectionsResponse{Status: succStatus()}
ts := getTravelTs(in)
log := log.Ctx(ctx).With(zap.String("dbname", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", ts))
log.Info("received request to show collections")
colls, err := c.meta.ListCollections(ctx, ts)
if err != nil {
metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollections", metrics.FailLabel).Inc()
resp.Status = failStatus(commonpb.ErrorCode_UnexpectedError, err.Error())
log.Error("failed to show collections", zap.Error(err))
return resp, nil
t := &showCollectionTask{
baseTask: newBaseTask(ctx, c),
Req: in,
Rsp: &milvuspb.ShowCollectionsResponse{},
}
for _, meta := range colls {
resp.CollectionNames = append(resp.CollectionNames, meta.Name)
resp.CollectionIds = append(resp.CollectionIds, meta.CollectionID)
resp.CreatedTimestamps = append(resp.CreatedTimestamps, meta.CreateTime)
physical, _ := tsoutil.ParseHybridTs(meta.CreateTime)
resp.CreatedUtcTimestamps = append(resp.CreatedUtcTimestamps, uint64(physical))
if err := c.scheduler.AddTask(t); err != nil {
log.Error("failed to enqueue request to show collections", zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollections", metrics.FailLabel).Inc()
return &milvuspb.ShowCollectionsResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "ShowCollections failed: "+err.Error()),
}, nil
}
if err := t.WaitToFinish(); err != nil {
log.Error("failed to show collections", zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollections", metrics.FailLabel).Inc()
return &milvuspb.ShowCollectionsResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "ShowCollections failed: "+err.Error()),
}, nil
}
metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollections", metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues("ShowCollections").Observe(float64(tr.ElapseSpan().Milliseconds()))
log.Info("done to show collections", zap.Int("num of collections", len(resp.GetCollectionNames()))) // maybe very large, print number instead.
log.Info("done to show collections", zap.Int("num of collections", len(t.Rsp.GetCollectionNames()))) // maybe very large, print number instead.
return resp, nil
return t.Rsp, nil
}
func (c *Core) AlterCollection(ctx context.Context, in *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) {
@ -1164,34 +1197,40 @@ func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionReques
// TODO(longjiquan): why HasPartitionRequest doesn't contain Timestamp but other requests do.
ts := typeutil.MaxTimestamp
resp := &milvuspb.BoolResponse{Status: succStatus(), Value: false}
log := log.Ctx(ctx).With(zap.String("collection", in.GetCollectionName()), zap.String("partition", in.GetPartitionName()), zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", ts))
log.Info("received request to has partition")
coll, err := c.meta.GetCollectionByName(ctx, in.GetCollectionName(), ts)
if err != nil {
// TODO: check if err indicates collection not exist.
log.Error("failed to has partition", zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues("HasPartition", metrics.FailLabel).Inc()
// TODO: use commonpb.ErrorCode_CollectionNotExists. SDK use commonpb.ErrorCode_UnexpectedError now.
resp.Status = failStatus(commonpb.ErrorCode_UnexpectedError, err.Error())
return resp, nil
t := &hasPartitionTask{
baseTask: newBaseTask(ctx, c),
Req: in,
Rsp: &milvuspb.BoolResponse{},
}
for _, part := range coll.Partitions {
if part.PartitionName == in.GetPartitionName() {
resp.Value = true
break
}
if err := c.scheduler.AddTask(t); err != nil {
log.Error("failed to enqueue request to has partition", zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues("HasPartition", metrics.FailLabel).Inc()
return &milvuspb.BoolResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "HasPartition failed: "+err.Error()),
Value: false,
}, nil
}
if err := t.WaitToFinish(); err != nil {
log.Error("failed to has partition", zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues("HasPartition", metrics.FailLabel).Inc()
return &milvuspb.BoolResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "HasPartition failed: "+err.Error()),
Value: false,
}, nil
}
metrics.RootCoordDDLReqCounter.WithLabelValues("HasPartition", metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues("HasPartition").Observe(float64(tr.ElapseSpan().Milliseconds()))
log.Info("done to has partition", zap.Bool("exist", resp.GetValue()))
log.Info("done to has partition", zap.Bool("exist", t.Rsp.GetValue()))
return resp, nil
return t.Rsp, nil
}
// ShowPartitions list all partition names
@ -1205,45 +1244,38 @@ func (c *Core) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRe
metrics.RootCoordDDLReqCounter.WithLabelValues("ShowPartitions", metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder("ShowPartitions")
// TODO(longjiquan): why ShowPartitionsRequest doesn't contain Timestamp but other requests do.
ts := typeutil.MaxTimestamp
resp := &milvuspb.ShowPartitionsResponse{Status: succStatus()}
log := log.Ctx(ctx).With(zap.String("collection", in.GetCollectionName()), zap.Int64("msgID", in.GetBase().GetMsgID()))
log.Info("received request to show partitions")
var coll *model.Collection
var err error
if in.GetCollectionName() == "" {
coll, err = c.meta.GetCollectionByID(ctx, in.GetCollectionID(), ts)
} else {
coll, err = c.meta.GetCollectionByName(ctx, in.GetCollectionName(), ts)
t := &showPartitionTask{
baseTask: newBaseTask(ctx, c),
Req: in,
Rsp: &milvuspb.ShowPartitionsResponse{},
}
if err != nil {
// TODO: check if err indicates collection not exist.
if err := c.scheduler.AddTask(t); err != nil {
log.Error("failed to enqueue request to show partitions", zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues("ShowPartitions", metrics.FailLabel).Inc()
return &milvuspb.ShowPartitionsResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "ShowPartitions failed: "+err.Error()),
}, nil
}
if err := t.WaitToFinish(); err != nil {
log.Error("failed to show partitions", zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues("ShowPartitions", metrics.FailLabel).Inc()
// TODO: use commonpb.ErrorCode_CollectionNotExists. SDK use commonpb.ErrorCode_UnexpectedError now.
resp.Status = failStatus(commonpb.ErrorCode_UnexpectedError, err.Error())
return resp, nil
}
for _, part := range coll.Partitions {
resp.PartitionIDs = append(resp.PartitionIDs, part.PartitionID)
resp.PartitionNames = append(resp.PartitionNames, part.PartitionName)
resp.CreatedTimestamps = append(resp.CreatedTimestamps, part.PartitionCreatedTimestamp)
physical, _ := tsoutil.ParseHybridTs(part.PartitionCreatedTimestamp)
resp.CreatedUtcTimestamps = append(resp.CreatedUtcTimestamps, uint64(physical))
return &milvuspb.ShowPartitionsResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "ShowPartitions failed: "+err.Error()),
}, nil
}
metrics.RootCoordDDLReqCounter.WithLabelValues("ShowPartitions", metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues("ShowPartitions").Observe(float64(tr.ElapseSpan().Milliseconds()))
log.Info("done to show partitions", zap.Strings("partitions", resp.GetPartitionNames()))
log.Info("done to show partitions", zap.Strings("partitions", t.Rsp.GetPartitionNames()))
return resp, nil
return t.Rsp, nil
}
// ShowSegments list all segments

View File

@ -17,11 +17,9 @@ import (
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/funcutil"
@ -30,7 +28,6 @@ import (
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
func TestRootCoord_CreateCollection(t *testing.T) {
@ -113,177 +110,6 @@ func TestRootCoord_DropCollection(t *testing.T) {
})
}
func TestRootCoord_DescribeCollection(t *testing.T) {
t.Run("not healthy", func(t *testing.T) {
c := newTestCore(withAbnormalCode())
ctx := context.Background()
resp, err := c.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("failed to get collection by name", func(t *testing.T) {
meta := mockrootcoord.NewIMetaTable(t)
meta.On("GetCollectionByName",
mock.Anything, // context.Context
mock.AnythingOfType("string"),
mock.AnythingOfType("uint64")).
Return(nil, errors.New("error mock GetCollectionByName"))
c := newTestCore(withHealthyCode(),
withMeta(meta))
ctx := context.Background()
resp, err := c.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{CollectionName: "test"})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("failed to get collection by id", func(t *testing.T) {
meta := mockrootcoord.NewIMetaTable(t)
meta.On("GetCollectionByID",
mock.Anything, // context.Context
mock.AnythingOfType("int64"),
mock.AnythingOfType("uint64")).
Return(nil, errors.New("error mock GetCollectionByID"))
c := newTestCore(withHealthyCode(),
withMeta(meta))
ctx := context.Background()
resp, err := c.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{CollectionID: 100})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("normal case, everything is ok", func(t *testing.T) {
meta := mockrootcoord.NewIMetaTable(t)
meta.On("GetCollectionByName",
mock.Anything, // context.Context
mock.AnythingOfType("string"),
mock.AnythingOfType("uint64")).
Return(&model.Collection{CollectionID: 100}, nil)
meta.On("ListAliasesByID",
mock.AnythingOfType("int64")).
Return([]string{"alias1", "alias2"})
c := newTestCore(withHealthyCode(),
withMeta(meta))
ctx := context.Background()
resp, err := c.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{CollectionName: "test"})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
assert.Equal(t, UniqueID(100), resp.GetCollectionID())
assert.ElementsMatch(t, []string{"alias1", "alias2"}, resp.GetAliases())
})
}
func TestRootCoord_HasCollection(t *testing.T) {
t.Run("not healthy", func(t *testing.T) {
c := newTestCore(withAbnormalCode())
ctx := context.Background()
resp, err := c.HasCollection(ctx, &milvuspb.HasCollectionRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("normal case, everything is ok", func(t *testing.T) {
meta := mockrootcoord.NewIMetaTable(t)
meta.On("GetCollectionByName",
mock.Anything, // context.Context
mock.AnythingOfType("string"),
mock.AnythingOfType("uint64"),
).Return(func(ctx context.Context, collectionName string, ts Timestamp) *model.Collection {
if ts == typeutil.MaxTimestamp {
return &model.Collection{}
}
return nil
}, func(ctx context.Context, collectionName string, ts Timestamp) error {
if ts == typeutil.MaxTimestamp {
return nil
}
return errors.New("error mock GetCollectionByName")
})
c := newTestCore(withHealthyCode(), withMeta(meta))
ctx := context.Background()
resp, err := c.HasCollection(ctx, &milvuspb.HasCollectionRequest{})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
assert.True(t, resp.GetValue())
resp, err = c.HasCollection(ctx, &milvuspb.HasCollectionRequest{TimeStamp: typeutil.MaxTimestamp})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
assert.True(t, resp.GetValue())
resp, err = c.HasCollection(ctx, &milvuspb.HasCollectionRequest{TimeStamp: 100})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
assert.False(t, resp.GetValue())
})
}
func TestRootCoord_ShowCollections(t *testing.T) {
t.Run("not healthy", func(t *testing.T) {
c := newTestCore(withAbnormalCode())
ctx := context.Background()
resp, err := c.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("normal case, everything is ok", func(t *testing.T) {
meta := mockrootcoord.NewIMetaTable(t)
meta.On("ListCollections",
mock.Anything, // context.Context
mock.AnythingOfType("uint64")).
Return(func(ctx context.Context, ts Timestamp) []*model.Collection {
if ts == typeutil.MaxTimestamp {
return []*model.Collection{
{
CollectionID: 100,
Name: "test",
State: pb.CollectionState_CollectionCreated,
},
}
}
return nil
}, func(ctx context.Context, ts Timestamp) error {
if ts == typeutil.MaxTimestamp {
return nil
}
return errors.New("error mock ListCollections")
})
c := newTestCore(withHealthyCode(),
withMeta(meta))
ctx := context.Background()
resp, err := c.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
assert.ElementsMatch(t, []int64{100}, resp.GetCollectionIds())
assert.ElementsMatch(t, []string{"test"}, resp.GetCollectionNames())
resp, err = c.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{TimeStamp: typeutil.MaxTimestamp})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
assert.ElementsMatch(t, []int64{100}, resp.GetCollectionIds())
assert.ElementsMatch(t, []string{"test"}, resp.GetCollectionNames())
resp, err = c.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{TimeStamp: 10000})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
}
func TestRootCoord_CreatePartition(t *testing.T) {
t.Run("not healthy", func(t *testing.T) {
c := newTestCore(withAbnormalCode())
@ -364,119 +190,6 @@ func TestRootCoord_DropPartition(t *testing.T) {
})
}
func TestRootCoord_HasPartition(t *testing.T) {
t.Run("not healthy", func(t *testing.T) {
c := newTestCore(withAbnormalCode())
ctx := context.Background()
resp, err := c.HasPartition(ctx, &milvuspb.HasPartitionRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("normal case, everything is ok", func(t *testing.T) {
meta := mockrootcoord.NewIMetaTable(t)
meta.On("GetCollectionByName",
mock.Anything, // context.Context
mock.AnythingOfType("string"),
mock.AnythingOfType("uint64"),
).Return(func(ctx context.Context, collectionName string, ts Timestamp) *model.Collection {
if collectionName == "test1" {
return &model.Collection{Partitions: []*model.Partition{{PartitionName: "test_partition"}}}
}
return nil
}, func(ctx context.Context, collectionName string, ts Timestamp) error {
if collectionName == "test1" {
return nil
}
return errors.New("error mock GetCollectionByName")
})
c := newTestCore(withHealthyCode(), withMeta(meta))
ctx := context.Background()
resp, err := c.HasPartition(ctx, &milvuspb.HasPartitionRequest{CollectionName: "error_case"})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
resp, err = c.HasPartition(ctx, &milvuspb.HasPartitionRequest{CollectionName: "test1", PartitionName: "test_partition"})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
assert.True(t, resp.GetValue())
resp, err = c.HasPartition(ctx, &milvuspb.HasPartitionRequest{CollectionName: "test1", PartitionName: "non_exist"})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
assert.False(t, resp.GetValue())
})
}
func TestRootCoord_ShowPartitions(t *testing.T) {
t.Run("not healthy", func(t *testing.T) {
c := newTestCore(withAbnormalCode())
ctx := context.Background()
resp, err := c.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("failed to get collection by name", func(t *testing.T) {
meta := mockrootcoord.NewIMetaTable(t)
meta.On("GetCollectionByName",
mock.Anything, // context.Context
mock.AnythingOfType("string"),
mock.AnythingOfType("uint64")).
Return(nil, errors.New("error mock GetCollectionByName"))
c := newTestCore(withHealthyCode(),
withMeta(meta))
ctx := context.Background()
resp, err := c.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{CollectionName: "test"})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("failed to get collection by id", func(t *testing.T) {
meta := mockrootcoord.NewIMetaTable(t)
meta.On("GetCollectionByID",
mock.Anything, // context.Context
mock.AnythingOfType("int64"),
mock.AnythingOfType("uint64")).
Return(nil, errors.New("error mock GetCollectionByID"))
c := newTestCore(withHealthyCode(),
withMeta(meta))
ctx := context.Background()
resp, err := c.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{CollectionID: 100})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("normal case, everything is ok", func(t *testing.T) {
meta := mockrootcoord.NewIMetaTable(t)
meta.On("GetCollectionByName",
mock.Anything, // context.Context
mock.AnythingOfType("string"),
mock.AnythingOfType("uint64"),
).Return(&model.Collection{Partitions: []*model.Partition{{
PartitionName: "test_partition",
PartitionID: 102,
}}}, nil)
c := newTestCore(withHealthyCode(), withMeta(meta))
ctx := context.Background()
resp, err := c.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{CollectionName: "test"})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
assert.ElementsMatch(t, []string{"test_partition"}, resp.GetPartitionNames())
assert.ElementsMatch(t, []int64{102}, resp.GetPartitionIDs())
})
}
func TestRootCoord_CreateAlias(t *testing.T) {
t.Run("not healthy", func(t *testing.T) {
c := newTestCore(withAbnormalCode())
@ -591,6 +304,204 @@ func TestRootCoord_AlterAlias(t *testing.T) {
})
}
func TestRootCoord_DescribeCollection(t *testing.T) {
t.Run("not healthy", func(t *testing.T) {
c := newTestCore(withAbnormalCode())
ctx := context.Background()
resp, err := c.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("failed to add task", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withInvalidScheduler())
ctx := context.Background()
resp, err := c.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("failed to execute", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withTaskFailScheduler())
ctx := context.Background()
resp, err := c.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("normal case, everything is ok", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withValidScheduler())
ctx := context.Background()
resp, err := c.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
}
func TestRootCoord_HasCollection(t *testing.T) {
t.Run("not healthy", func(t *testing.T) {
c := newTestCore(withAbnormalCode())
ctx := context.Background()
resp, err := c.HasCollection(ctx, &milvuspb.HasCollectionRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("failed to add task", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withInvalidScheduler())
ctx := context.Background()
resp, err := c.HasCollection(ctx, &milvuspb.HasCollectionRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("failed to execute", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withTaskFailScheduler())
ctx := context.Background()
resp, err := c.HasCollection(ctx, &milvuspb.HasCollectionRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("normal case, everything is ok", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withValidScheduler())
ctx := context.Background()
resp, err := c.HasCollection(ctx, &milvuspb.HasCollectionRequest{})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
}
func TestRootCoord_ShowCollections(t *testing.T) {
t.Run("not healthy", func(t *testing.T) {
c := newTestCore(withAbnormalCode())
ctx := context.Background()
resp, err := c.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("failed to add task", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withInvalidScheduler())
ctx := context.Background()
resp, err := c.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("failed to execute", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withTaskFailScheduler())
ctx := context.Background()
resp, err := c.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("normal case, everything is ok", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withValidScheduler())
ctx := context.Background()
resp, err := c.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
}
func TestRootCoord_HasPartition(t *testing.T) {
t.Run("not healthy", func(t *testing.T) {
c := newTestCore(withAbnormalCode())
ctx := context.Background()
resp, err := c.HasPartition(ctx, &milvuspb.HasPartitionRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("failed to add task", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withInvalidScheduler())
ctx := context.Background()
resp, err := c.HasPartition(ctx, &milvuspb.HasPartitionRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("failed to execute", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withTaskFailScheduler())
ctx := context.Background()
resp, err := c.HasPartition(ctx, &milvuspb.HasPartitionRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("normal case, everything is ok", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withValidScheduler())
ctx := context.Background()
resp, err := c.HasPartition(ctx, &milvuspb.HasPartitionRequest{})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
}
func TestRootCoord_ShowPartitions(t *testing.T) {
t.Run("not healthy", func(t *testing.T) {
c := newTestCore(withAbnormalCode())
ctx := context.Background()
resp, err := c.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("failed to add task", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withInvalidScheduler())
ctx := context.Background()
resp, err := c.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("failed to execute", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withTaskFailScheduler())
ctx := context.Background()
resp, err := c.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("normal case, everything is ok", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withValidScheduler())
ctx := context.Background()
resp, err := c.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
}
func TestRootCoord_AllocTimestamp(t *testing.T) {
t.Run("not healthy", func(t *testing.T) {
ctx := context.Background()

View File

@ -0,0 +1,46 @@
package rootcoord
import (
"context"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
// showCollectionTask show collection request task
type showCollectionTask struct {
baseTask
Req *milvuspb.ShowCollectionsRequest
Rsp *milvuspb.ShowCollectionsResponse
}
func (t *showCollectionTask) Prepare(ctx context.Context) error {
if err := CheckMsgType(t.Req.Base.MsgType, commonpb.MsgType_ShowCollections); err != nil {
return err
}
return nil
}
// Execute task execution
func (t *showCollectionTask) Execute(ctx context.Context) error {
t.Rsp.Status = succStatus()
ts := t.Req.GetTimeStamp()
if ts == 0 {
ts = typeutil.MaxTimestamp
}
colls, err := t.core.meta.ListCollections(ctx, ts)
if err != nil {
t.Rsp.Status = failStatus(commonpb.ErrorCode_UnexpectedError, err.Error())
return err
}
for _, meta := range colls {
t.Rsp.CollectionNames = append(t.Rsp.CollectionNames, meta.Name)
t.Rsp.CollectionIds = append(t.Rsp.CollectionIds, meta.CollectionID)
t.Rsp.CreatedTimestamps = append(t.Rsp.CreatedTimestamps, meta.CreateTime)
physical, _ := tsoutil.ParseHybridTs(meta.CreateTime)
t.Rsp.CreatedUtcTimestamps = append(t.Rsp.CreatedUtcTimestamps, uint64(physical))
}
return nil
}

View File

@ -0,0 +1,88 @@
package rootcoord
import (
"context"
"testing"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/stretchr/testify/assert"
)
func Test_showCollectionTask_Prepare(t *testing.T) {
t.Run("invalid msg type", func(t *testing.T) {
task := &showCollectionTask{
Req: &milvuspb.ShowCollectionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Undefined,
},
},
}
err := task.Prepare(context.Background())
assert.Error(t, err)
})
t.Run("normal case", func(t *testing.T) {
task := &showCollectionTask{
Req: &milvuspb.ShowCollectionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ShowCollections,
},
},
}
err := task.Prepare(context.Background())
assert.NoError(t, err)
})
}
func Test_showCollectionTask_Execute(t *testing.T) {
t.Run("failed to list collections", func(t *testing.T) {
core := newTestCore(withInvalidMeta())
task := &showCollectionTask{
baseTask: baseTask{
core: core,
done: make(chan error, 1),
},
Req: &milvuspb.ShowCollectionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ShowCollections,
},
},
Rsp: &milvuspb.ShowCollectionsResponse{},
}
err := task.Execute(context.Background())
assert.Error(t, err)
})
t.Run("success", func(t *testing.T) {
meta := newMockMetaTable()
meta.ListCollectionsFunc = func(ctx context.Context, ts Timestamp) ([]*model.Collection, error) {
return []*model.Collection{
{
Name: "test coll",
},
{
Name: "test coll2",
},
}, nil
}
core := newTestCore(withMeta(meta))
task := &showCollectionTask{
baseTask: baseTask{
core: core,
done: make(chan error, 1),
},
Req: &milvuspb.ShowCollectionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ShowCollections,
},
},
Rsp: &milvuspb.ShowCollectionsResponse{},
}
err := task.Execute(context.Background())
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, task.Rsp.GetStatus().GetErrorCode())
assert.Equal(t, 2, len(task.Rsp.GetCollectionNames()))
})
}

View File

@ -0,0 +1,51 @@
package rootcoord
import (
"context"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
// showPartitionTask show partition request task
type showPartitionTask struct {
baseTask
Req *milvuspb.ShowPartitionsRequest
Rsp *milvuspb.ShowPartitionsResponse
}
func (t *showPartitionTask) Prepare(ctx context.Context) error {
if err := CheckMsgType(t.Req.Base.MsgType, commonpb.MsgType_ShowPartitions); err != nil {
return err
}
return nil
}
// Execute task execution
func (t *showPartitionTask) Execute(ctx context.Context) error {
var coll *model.Collection
var err error
t.Rsp.Status = succStatus()
if t.Req.GetCollectionName() == "" {
coll, err = t.core.meta.GetCollectionByID(ctx, t.Req.GetCollectionID(), typeutil.MaxTimestamp)
} else {
coll, err = t.core.meta.GetCollectionByName(ctx, t.Req.GetCollectionName(), typeutil.MaxTimestamp)
}
if err != nil {
t.Rsp.Status = failStatus(commonpb.ErrorCode_CollectionNotExists, err.Error())
return err
}
for _, part := range coll.Partitions {
t.Rsp.PartitionIDs = append(t.Rsp.PartitionIDs, part.PartitionID)
t.Rsp.PartitionNames = append(t.Rsp.PartitionNames, part.PartitionName)
t.Rsp.CreatedTimestamps = append(t.Rsp.CreatedTimestamps, part.PartitionCreatedTimestamp)
physical, _ := tsoutil.ParseHybridTs(part.PartitionCreatedTimestamp)
t.Rsp.CreatedUtcTimestamps = append(t.Rsp.CreatedUtcTimestamps, uint64(physical))
}
return nil
}

View File

@ -0,0 +1,118 @@
package rootcoord
import (
"context"
"testing"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/assert"
)
func Test_showPartitionTask_Prepare(t *testing.T) {
t.Run("invalid msg type", func(t *testing.T) {
task := &showPartitionTask{
Req: &milvuspb.ShowPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Undefined,
},
},
}
err := task.Prepare(context.Background())
assert.Error(t, err)
})
t.Run("normal case", func(t *testing.T) {
task := &showPartitionTask{
Req: &milvuspb.ShowPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ShowPartitions,
},
},
}
err := task.Prepare(context.Background())
assert.NoError(t, err)
})
}
func Test_showPartitionTask_Execute(t *testing.T) {
t.Run("failed to list collections by name", func(t *testing.T) {
core := newTestCore(withInvalidMeta())
task := &showPartitionTask{
baseTask: baseTask{
core: core,
done: make(chan error, 1),
},
Req: &milvuspb.ShowPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ShowPartitions,
},
CollectionName: "test coll",
},
Rsp: &milvuspb.ShowPartitionsResponse{},
}
err := task.Execute(context.Background())
assert.Error(t, err)
assert.Equal(t, task.Rsp.GetStatus().GetErrorCode(), commonpb.ErrorCode_CollectionNotExists)
})
t.Run("failed to list collections by id", func(t *testing.T) {
core := newTestCore(withInvalidMeta())
task := &showPartitionTask{
baseTask: baseTask{
core: core,
done: make(chan error, 1),
},
Req: &milvuspb.ShowPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ShowPartitions,
},
CollectionID: 1,
},
Rsp: &milvuspb.ShowPartitionsResponse{},
}
err := task.Execute(context.Background())
assert.Error(t, err)
assert.Equal(t, task.Rsp.GetStatus().GetErrorCode(), commonpb.ErrorCode_CollectionNotExists)
})
t.Run("success", func(t *testing.T) {
meta := newMockMetaTable()
meta.GetCollectionByIDFunc = func(ctx context.Context, collectionID typeutil.UniqueID, ts Timestamp) (*model.Collection, error) {
return &model.Collection{
CollectionID: collectionID,
Name: "test coll",
Partitions: []*model.Partition{
{
PartitionID: 1,
PartitionName: "test partition1",
},
{
PartitionID: 2,
PartitionName: "test partition2",
},
},
}, nil
}
core := newTestCore(withMeta(meta))
task := &showPartitionTask{
baseTask: baseTask{
core: core,
done: make(chan error, 1),
},
Req: &milvuspb.ShowPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ShowPartitions,
},
CollectionID: 1,
},
Rsp: &milvuspb.ShowPartitionsResponse{},
}
err := task.Execute(context.Background())
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, task.Rsp.GetStatus().GetErrorCode())
assert.Equal(t, 2, len(task.Rsp.GetPartitionNames()))
})
}

View File

@ -25,6 +25,15 @@ type baseTask struct {
id UniqueID
}
func newBaseTask(ctx context.Context, core *Core) baseTask {
b := baseTask{
core: core,
done: make(chan error, 1),
}
b.SetCtx(ctx)
return b
}
func (b *baseTask) SetCtx(ctx context.Context) {
b.ctx = ctx
}