mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Signed-off-by: longjiquan <jiquan.long@zilliz.com> Signed-off-by: longjiquan <jiquan.long@zilliz.com>
991 lines
32 KiB
Go
991 lines
32 KiB
Go
package rootcoord
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"math/rand"
|
|
"testing"
|
|
"time"
|
|
|
|
mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks"
|
|
|
|
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
|
|
"github.com/milvus-io/milvus/internal/util/typeutil"
|
|
|
|
"github.com/milvus-io/milvus/internal/metastore/model"
|
|
"github.com/stretchr/testify/mock"
|
|
|
|
"github.com/milvus-io/milvus/internal/proto/proxypb"
|
|
"github.com/milvus-io/milvus/internal/util/metricsinfo"
|
|
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
|
|
|
"github.com/milvus-io/milvus/internal/util/funcutil"
|
|
|
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
|
|
|
"github.com/milvus-io/milvus/internal/allocator"
|
|
|
|
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
|
|
|
|
"github.com/milvus-io/milvus/api/commonpb"
|
|
"github.com/stretchr/testify/assert"
|
|
|
|
"github.com/milvus-io/milvus/api/milvuspb"
|
|
)
|
|
|
|
func TestRootCoord_CreateCollection(t *testing.T) {
|
|
t.Run("not healthy", func(t *testing.T) {
|
|
c := newTestCore(withAbnormalCode())
|
|
ctx := context.Background()
|
|
resp, err := c.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
|
|
t.Run("failed to add task", func(t *testing.T) {
|
|
c := newTestCore(withHealthyCode(),
|
|
withInvalidScheduler())
|
|
|
|
ctx := context.Background()
|
|
resp, err := c.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
|
|
t.Run("failed to execute", func(t *testing.T) {
|
|
c := newTestCore(withHealthyCode(),
|
|
withTaskFailScheduler())
|
|
|
|
ctx := context.Background()
|
|
resp, err := c.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
|
|
t.Run("normal case, everything is ok", func(t *testing.T) {
|
|
c := newTestCore(withHealthyCode(),
|
|
withValidScheduler())
|
|
|
|
ctx := context.Background()
|
|
resp, err := c.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{})
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
}
|
|
|
|
func TestRootCoord_DropCollection(t *testing.T) {
|
|
t.Run("not healthy", func(t *testing.T) {
|
|
c := newTestCore(withAbnormalCode())
|
|
ctx := context.Background()
|
|
resp, err := c.DropCollection(ctx, &milvuspb.DropCollectionRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
|
|
t.Run("failed to add task", func(t *testing.T) {
|
|
c := newTestCore(withHealthyCode(),
|
|
withInvalidScheduler())
|
|
|
|
ctx := context.Background()
|
|
resp, err := c.DropCollection(ctx, &milvuspb.DropCollectionRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
|
|
t.Run("failed to execute", func(t *testing.T) {
|
|
c := newTestCore(withHealthyCode(),
|
|
withTaskFailScheduler())
|
|
|
|
ctx := context.Background()
|
|
resp, err := c.DropCollection(ctx, &milvuspb.DropCollectionRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
|
|
t.Run("normal case, everything is ok", func(t *testing.T) {
|
|
c := newTestCore(withHealthyCode(),
|
|
withValidScheduler())
|
|
|
|
ctx := context.Background()
|
|
resp, err := c.DropCollection(ctx, &milvuspb.DropCollectionRequest{})
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
}
|
|
|
|
func TestRootCoord_DescribeCollection(t *testing.T) {
|
|
t.Run("not healthy", func(t *testing.T) {
|
|
c := newTestCore(withAbnormalCode())
|
|
ctx := context.Background()
|
|
resp, err := c.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
})
|
|
|
|
t.Run("failed to get collection by name", func(t *testing.T) {
|
|
meta := mockrootcoord.NewIMetaTable(t)
|
|
meta.On("GetCollectionByName",
|
|
mock.Anything, // context.Context
|
|
mock.AnythingOfType("string"),
|
|
mock.AnythingOfType("uint64")).
|
|
Return(nil, errors.New("error mock GetCollectionByName"))
|
|
|
|
c := newTestCore(withHealthyCode(),
|
|
withMeta(meta))
|
|
|
|
ctx := context.Background()
|
|
|
|
resp, err := c.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{CollectionName: "test"})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
})
|
|
|
|
t.Run("failed to get collection by id", func(t *testing.T) {
|
|
meta := mockrootcoord.NewIMetaTable(t)
|
|
meta.On("GetCollectionByID",
|
|
mock.Anything, // context.Context
|
|
mock.AnythingOfType("int64"),
|
|
mock.AnythingOfType("uint64")).
|
|
Return(nil, errors.New("error mock GetCollectionByID"))
|
|
|
|
c := newTestCore(withHealthyCode(),
|
|
withMeta(meta))
|
|
|
|
ctx := context.Background()
|
|
|
|
resp, err := c.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{CollectionID: 100})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
})
|
|
|
|
t.Run("normal case, everything is ok", func(t *testing.T) {
|
|
meta := mockrootcoord.NewIMetaTable(t)
|
|
meta.On("GetCollectionByName",
|
|
mock.Anything, // context.Context
|
|
mock.AnythingOfType("string"),
|
|
mock.AnythingOfType("uint64")).
|
|
Return(&model.Collection{CollectionID: 100}, nil)
|
|
meta.On("ListAliasesByID",
|
|
mock.AnythingOfType("int64")).
|
|
Return([]string{"alias1", "alias2"})
|
|
|
|
c := newTestCore(withHealthyCode(),
|
|
withMeta(meta))
|
|
|
|
ctx := context.Background()
|
|
|
|
resp, err := c.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{CollectionName: "test"})
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
assert.Equal(t, UniqueID(100), resp.GetCollectionID())
|
|
assert.ElementsMatch(t, []string{"alias1", "alias2"}, resp.GetAliases())
|
|
})
|
|
}
|
|
|
|
func TestRootCoord_HasCollection(t *testing.T) {
|
|
t.Run("not healthy", func(t *testing.T) {
|
|
c := newTestCore(withAbnormalCode())
|
|
ctx := context.Background()
|
|
resp, err := c.HasCollection(ctx, &milvuspb.HasCollectionRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
})
|
|
|
|
t.Run("normal case, everything is ok", func(t *testing.T) {
|
|
meta := mockrootcoord.NewIMetaTable(t)
|
|
meta.On("GetCollectionByName",
|
|
mock.Anything, // context.Context
|
|
mock.AnythingOfType("string"),
|
|
mock.AnythingOfType("uint64"),
|
|
).Return(func(ctx context.Context, collectionName string, ts Timestamp) *model.Collection {
|
|
if ts == typeutil.MaxTimestamp {
|
|
return &model.Collection{}
|
|
}
|
|
return nil
|
|
}, func(ctx context.Context, collectionName string, ts Timestamp) error {
|
|
if ts == typeutil.MaxTimestamp {
|
|
return nil
|
|
}
|
|
return errors.New("error mock GetCollectionByName")
|
|
})
|
|
|
|
c := newTestCore(withHealthyCode(), withMeta(meta))
|
|
ctx := context.Background()
|
|
|
|
resp, err := c.HasCollection(ctx, &milvuspb.HasCollectionRequest{})
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
assert.True(t, resp.GetValue())
|
|
|
|
resp, err = c.HasCollection(ctx, &milvuspb.HasCollectionRequest{TimeStamp: typeutil.MaxTimestamp})
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
assert.True(t, resp.GetValue())
|
|
|
|
resp, err = c.HasCollection(ctx, &milvuspb.HasCollectionRequest{TimeStamp: 100})
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
assert.False(t, resp.GetValue())
|
|
})
|
|
}
|
|
|
|
func TestRootCoord_ShowCollections(t *testing.T) {
|
|
t.Run("not healthy", func(t *testing.T) {
|
|
c := newTestCore(withAbnormalCode())
|
|
ctx := context.Background()
|
|
resp, err := c.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
})
|
|
|
|
t.Run("normal case, everything is ok", func(t *testing.T) {
|
|
meta := mockrootcoord.NewIMetaTable(t)
|
|
meta.On("ListCollections",
|
|
mock.Anything, // context.Context
|
|
mock.AnythingOfType("uint64")).
|
|
Return(func(ctx context.Context, ts Timestamp) []*model.Collection {
|
|
if ts == typeutil.MaxTimestamp {
|
|
return []*model.Collection{
|
|
{
|
|
CollectionID: 100,
|
|
Name: "test",
|
|
State: pb.CollectionState_CollectionCreated,
|
|
},
|
|
}
|
|
}
|
|
return nil
|
|
}, func(ctx context.Context, ts Timestamp) error {
|
|
if ts == typeutil.MaxTimestamp {
|
|
return nil
|
|
}
|
|
return errors.New("error mock ListCollections")
|
|
})
|
|
|
|
c := newTestCore(withHealthyCode(),
|
|
withMeta(meta))
|
|
|
|
ctx := context.Background()
|
|
|
|
resp, err := c.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
assert.ElementsMatch(t, []int64{100}, resp.GetCollectionIds())
|
|
assert.ElementsMatch(t, []string{"test"}, resp.GetCollectionNames())
|
|
|
|
resp, err = c.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{TimeStamp: typeutil.MaxTimestamp})
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
assert.ElementsMatch(t, []int64{100}, resp.GetCollectionIds())
|
|
assert.ElementsMatch(t, []string{"test"}, resp.GetCollectionNames())
|
|
|
|
resp, err = c.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{TimeStamp: 10000})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
})
|
|
}
|
|
|
|
func TestRootCoord_CreatePartition(t *testing.T) {
|
|
t.Run("not healthy", func(t *testing.T) {
|
|
c := newTestCore(withAbnormalCode())
|
|
ctx := context.Background()
|
|
resp, err := c.CreatePartition(ctx, &milvuspb.CreatePartitionRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
|
|
t.Run("failed to add task", func(t *testing.T) {
|
|
c := newTestCore(withHealthyCode(),
|
|
withInvalidScheduler())
|
|
|
|
ctx := context.Background()
|
|
resp, err := c.CreatePartition(ctx, &milvuspb.CreatePartitionRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
|
|
t.Run("failed to execute", func(t *testing.T) {
|
|
c := newTestCore(withHealthyCode(),
|
|
withTaskFailScheduler())
|
|
|
|
ctx := context.Background()
|
|
resp, err := c.CreatePartition(ctx, &milvuspb.CreatePartitionRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
|
|
t.Run("normal case, everything is ok", func(t *testing.T) {
|
|
c := newTestCore(withHealthyCode(),
|
|
withValidScheduler())
|
|
|
|
ctx := context.Background()
|
|
resp, err := c.CreatePartition(ctx, &milvuspb.CreatePartitionRequest{})
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
}
|
|
|
|
func TestRootCoord_DropPartition(t *testing.T) {
|
|
t.Run("not healthy", func(t *testing.T) {
|
|
c := newTestCore(withAbnormalCode())
|
|
ctx := context.Background()
|
|
resp, err := c.DropPartition(ctx, &milvuspb.DropPartitionRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
|
|
t.Run("failed to add task", func(t *testing.T) {
|
|
c := newTestCore(withHealthyCode(),
|
|
withInvalidScheduler())
|
|
|
|
ctx := context.Background()
|
|
resp, err := c.DropPartition(ctx, &milvuspb.DropPartitionRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
|
|
t.Run("failed to execute", func(t *testing.T) {
|
|
c := newTestCore(withHealthyCode(),
|
|
withTaskFailScheduler())
|
|
|
|
ctx := context.Background()
|
|
resp, err := c.DropPartition(ctx, &milvuspb.DropPartitionRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
|
|
t.Run("normal case, everything is ok", func(t *testing.T) {
|
|
c := newTestCore(withHealthyCode(),
|
|
withValidScheduler())
|
|
|
|
ctx := context.Background()
|
|
resp, err := c.DropPartition(ctx, &milvuspb.DropPartitionRequest{})
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
}
|
|
|
|
func TestRootCoord_HasPartition(t *testing.T) {
|
|
t.Run("not healthy", func(t *testing.T) {
|
|
c := newTestCore(withAbnormalCode())
|
|
ctx := context.Background()
|
|
resp, err := c.HasPartition(ctx, &milvuspb.HasPartitionRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
})
|
|
|
|
t.Run("normal case, everything is ok", func(t *testing.T) {
|
|
meta := mockrootcoord.NewIMetaTable(t)
|
|
meta.On("GetCollectionByName",
|
|
mock.Anything, // context.Context
|
|
mock.AnythingOfType("string"),
|
|
mock.AnythingOfType("uint64"),
|
|
).Return(func(ctx context.Context, collectionName string, ts Timestamp) *model.Collection {
|
|
if collectionName == "test1" {
|
|
return &model.Collection{Partitions: []*model.Partition{{PartitionName: "test_partition"}}}
|
|
}
|
|
return nil
|
|
}, func(ctx context.Context, collectionName string, ts Timestamp) error {
|
|
if collectionName == "test1" {
|
|
return nil
|
|
}
|
|
return errors.New("error mock GetCollectionByName")
|
|
})
|
|
|
|
c := newTestCore(withHealthyCode(), withMeta(meta))
|
|
ctx := context.Background()
|
|
|
|
resp, err := c.HasPartition(ctx, &milvuspb.HasPartitionRequest{CollectionName: "error_case"})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
|
|
resp, err = c.HasPartition(ctx, &milvuspb.HasPartitionRequest{CollectionName: "test1", PartitionName: "test_partition"})
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
assert.True(t, resp.GetValue())
|
|
|
|
resp, err = c.HasPartition(ctx, &milvuspb.HasPartitionRequest{CollectionName: "test1", PartitionName: "non_exist"})
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
assert.False(t, resp.GetValue())
|
|
})
|
|
}
|
|
|
|
func TestRootCoord_ShowPartitions(t *testing.T) {
|
|
t.Run("not healthy", func(t *testing.T) {
|
|
c := newTestCore(withAbnormalCode())
|
|
ctx := context.Background()
|
|
resp, err := c.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
})
|
|
|
|
t.Run("failed to get collection by name", func(t *testing.T) {
|
|
meta := mockrootcoord.NewIMetaTable(t)
|
|
meta.On("GetCollectionByName",
|
|
mock.Anything, // context.Context
|
|
mock.AnythingOfType("string"),
|
|
mock.AnythingOfType("uint64")).
|
|
Return(nil, errors.New("error mock GetCollectionByName"))
|
|
|
|
c := newTestCore(withHealthyCode(),
|
|
withMeta(meta))
|
|
|
|
ctx := context.Background()
|
|
|
|
resp, err := c.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{CollectionName: "test"})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
})
|
|
|
|
t.Run("failed to get collection by id", func(t *testing.T) {
|
|
meta := mockrootcoord.NewIMetaTable(t)
|
|
meta.On("GetCollectionByID",
|
|
mock.Anything, // context.Context
|
|
mock.AnythingOfType("int64"),
|
|
mock.AnythingOfType("uint64")).
|
|
Return(nil, errors.New("error mock GetCollectionByID"))
|
|
|
|
c := newTestCore(withHealthyCode(),
|
|
withMeta(meta))
|
|
|
|
ctx := context.Background()
|
|
|
|
resp, err := c.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{CollectionID: 100})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
})
|
|
|
|
t.Run("normal case, everything is ok", func(t *testing.T) {
|
|
meta := mockrootcoord.NewIMetaTable(t)
|
|
meta.On("GetCollectionByName",
|
|
mock.Anything, // context.Context
|
|
mock.AnythingOfType("string"),
|
|
mock.AnythingOfType("uint64"),
|
|
).Return(&model.Collection{Partitions: []*model.Partition{{
|
|
PartitionName: "test_partition",
|
|
PartitionID: 102,
|
|
}}}, nil)
|
|
|
|
c := newTestCore(withHealthyCode(), withMeta(meta))
|
|
ctx := context.Background()
|
|
|
|
resp, err := c.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{CollectionName: "test"})
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
assert.ElementsMatch(t, []string{"test_partition"}, resp.GetPartitionNames())
|
|
assert.ElementsMatch(t, []int64{102}, resp.GetPartitionIDs())
|
|
})
|
|
}
|
|
|
|
func TestRootCoord_CreateAlias(t *testing.T) {
|
|
t.Run("not healthy", func(t *testing.T) {
|
|
c := newTestCore(withAbnormalCode())
|
|
ctx := context.Background()
|
|
resp, err := c.CreateAlias(ctx, &milvuspb.CreateAliasRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
|
|
t.Run("failed to add task", func(t *testing.T) {
|
|
c := newTestCore(withHealthyCode(),
|
|
withInvalidScheduler())
|
|
|
|
ctx := context.Background()
|
|
resp, err := c.CreateAlias(ctx, &milvuspb.CreateAliasRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
|
|
t.Run("failed to execute", func(t *testing.T) {
|
|
c := newTestCore(withHealthyCode(),
|
|
withTaskFailScheduler())
|
|
ctx := context.Background()
|
|
resp, err := c.CreateAlias(ctx, &milvuspb.CreateAliasRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
|
|
t.Run("normal case, everything is ok", func(t *testing.T) {
|
|
c := newTestCore(withHealthyCode(),
|
|
withValidScheduler())
|
|
ctx := context.Background()
|
|
resp, err := c.CreateAlias(ctx, &milvuspb.CreateAliasRequest{})
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
}
|
|
|
|
func TestRootCoord_DropAlias(t *testing.T) {
|
|
t.Run("not healthy", func(t *testing.T) {
|
|
c := newTestCore(withAbnormalCode())
|
|
ctx := context.Background()
|
|
resp, err := c.DropAlias(ctx, &milvuspb.DropAliasRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
|
|
t.Run("failed to add task", func(t *testing.T) {
|
|
c := newTestCore(withHealthyCode(),
|
|
withInvalidScheduler())
|
|
|
|
ctx := context.Background()
|
|
resp, err := c.DropAlias(ctx, &milvuspb.DropAliasRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
|
|
t.Run("failed to execute", func(t *testing.T) {
|
|
c := newTestCore(withHealthyCode(),
|
|
withTaskFailScheduler())
|
|
ctx := context.Background()
|
|
resp, err := c.DropAlias(ctx, &milvuspb.DropAliasRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
|
|
t.Run("normal case, everything is ok", func(t *testing.T) {
|
|
c := newTestCore(withHealthyCode(),
|
|
withValidScheduler())
|
|
ctx := context.Background()
|
|
resp, err := c.DropAlias(ctx, &milvuspb.DropAliasRequest{})
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
}
|
|
|
|
func TestRootCoord_AlterAlias(t *testing.T) {
|
|
t.Run("not healthy", func(t *testing.T) {
|
|
c := newTestCore(withAbnormalCode())
|
|
ctx := context.Background()
|
|
resp, err := c.AlterAlias(ctx, &milvuspb.AlterAliasRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
|
|
t.Run("failed to add task", func(t *testing.T) {
|
|
c := newTestCore(withHealthyCode(),
|
|
withInvalidScheduler())
|
|
|
|
ctx := context.Background()
|
|
resp, err := c.AlterAlias(ctx, &milvuspb.AlterAliasRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
|
|
t.Run("failed to execute", func(t *testing.T) {
|
|
c := newTestCore(withHealthyCode(),
|
|
withTaskFailScheduler())
|
|
ctx := context.Background()
|
|
resp, err := c.AlterAlias(ctx, &milvuspb.AlterAliasRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
|
|
t.Run("normal case, everything is ok", func(t *testing.T) {
|
|
c := newTestCore(withHealthyCode(),
|
|
withValidScheduler())
|
|
ctx := context.Background()
|
|
resp, err := c.AlterAlias(ctx, &milvuspb.AlterAliasRequest{})
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
}
|
|
|
|
func TestRootCoord_AllocTimestamp(t *testing.T) {
|
|
t.Run("not healthy", func(t *testing.T) {
|
|
ctx := context.Background()
|
|
c := newTestCore(withAbnormalCode())
|
|
resp, err := c.AllocTimestamp(ctx, &rootcoordpb.AllocTimestampRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
})
|
|
|
|
t.Run("failed to allocate ts", func(t *testing.T) {
|
|
ctx := context.Background()
|
|
c := newTestCore(withHealthyCode(),
|
|
withInvalidTsoAllocator())
|
|
resp, err := c.AllocTimestamp(ctx, &rootcoordpb.AllocTimestampRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
})
|
|
|
|
t.Run("normal case", func(t *testing.T) {
|
|
alloc := newMockTsoAllocator()
|
|
count := uint32(10)
|
|
ts := Timestamp(100)
|
|
alloc.GenerateTSOF = func(count uint32) (uint64, error) {
|
|
// end ts
|
|
return ts, nil
|
|
}
|
|
ctx := context.Background()
|
|
c := newTestCore(withHealthyCode(),
|
|
withTsoAllocator(alloc))
|
|
resp, err := c.AllocTimestamp(ctx, &rootcoordpb.AllocTimestampRequest{Count: count})
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
// begin ts
|
|
assert.Equal(t, ts-uint64(count)+1, resp.GetTimestamp())
|
|
assert.Equal(t, count, resp.GetCount())
|
|
})
|
|
}
|
|
|
|
func TestRootCoord_AllocID(t *testing.T) {
|
|
t.Run("not healthy", func(t *testing.T) {
|
|
ctx := context.Background()
|
|
c := newTestCore(withAbnormalCode())
|
|
resp, err := c.AllocID(ctx, &rootcoordpb.AllocIDRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
})
|
|
|
|
t.Run("failed to allocate id", func(t *testing.T) {
|
|
ctx := context.Background()
|
|
c := newTestCore(withHealthyCode(),
|
|
withInvalidIDAllocator())
|
|
resp, err := c.AllocID(ctx, &rootcoordpb.AllocIDRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
})
|
|
|
|
t.Run("normal case", func(t *testing.T) {
|
|
alloc := newMockIDAllocator()
|
|
id := UniqueID(100)
|
|
alloc.AllocF = func(count uint32) (allocator.UniqueID, allocator.UniqueID, error) {
|
|
return id, id + int64(count), nil
|
|
}
|
|
count := uint32(10)
|
|
ctx := context.Background()
|
|
c := newTestCore(withHealthyCode(),
|
|
withIDAllocator(alloc))
|
|
resp, err := c.AllocID(ctx, &rootcoordpb.AllocIDRequest{Count: count})
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
assert.Equal(t, id, resp.GetID())
|
|
assert.Equal(t, count, resp.GetCount())
|
|
})
|
|
}
|
|
|
|
func TestRootCoord_UpdateChannelTimeTick(t *testing.T) {
|
|
t.Run("not healthy", func(t *testing.T) {
|
|
ctx := context.Background()
|
|
c := newTestCore(withAbnormalCode())
|
|
resp, err := c.UpdateChannelTimeTick(ctx, &internalpb.ChannelTimeTickMsg{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
|
|
t.Run("invalid msg type", func(t *testing.T) {
|
|
ctx := context.Background()
|
|
c := newTestCore(withHealthyCode())
|
|
resp, err := c.UpdateChannelTimeTick(ctx, &internalpb.ChannelTimeTickMsg{Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropCollection}})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
|
|
t.Run("invalid msg", func(t *testing.T) {
|
|
defer cleanTestEnv()
|
|
|
|
ticker := newRocksMqTtSynchronizer()
|
|
|
|
ctx := context.Background()
|
|
c := newTestCore(withHealthyCode(),
|
|
withTtSynchronizer(ticker))
|
|
|
|
// the length of channel names & timestamps mismatch.
|
|
resp, err := c.UpdateChannelTimeTick(ctx, &internalpb.ChannelTimeTickMsg{
|
|
Base: &commonpb.MsgBase{
|
|
MsgType: commonpb.MsgType_TimeTick,
|
|
},
|
|
ChannelNames: []string{funcutil.GenRandomStr()},
|
|
Timestamps: []uint64{},
|
|
})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
|
|
t.Run("normal case", func(t *testing.T) {
|
|
defer cleanTestEnv()
|
|
|
|
source := int64(20220824)
|
|
ts := Timestamp(100)
|
|
defaultTs := Timestamp(101)
|
|
|
|
ticker := newRocksMqTtSynchronizer()
|
|
ticker.addSession(&sessionutil.Session{ServerID: source})
|
|
|
|
ctx := context.Background()
|
|
c := newTestCore(withHealthyCode(),
|
|
withTtSynchronizer(ticker))
|
|
|
|
resp, err := c.UpdateChannelTimeTick(ctx, &internalpb.ChannelTimeTickMsg{
|
|
Base: &commonpb.MsgBase{
|
|
SourceID: source,
|
|
MsgType: commonpb.MsgType_TimeTick,
|
|
},
|
|
ChannelNames: []string{funcutil.GenRandomStr()},
|
|
Timestamps: []uint64{ts},
|
|
DefaultTimestamp: defaultTs,
|
|
})
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
}
|
|
|
|
func TestRootCoord_InvalidateCollectionMetaCache(t *testing.T) {
|
|
t.Run("not healthy", func(t *testing.T) {
|
|
ctx := context.Background()
|
|
c := newTestCore(withAbnormalCode())
|
|
resp, err := c.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
|
|
t.Run("failed to invalidate cache", func(t *testing.T) {
|
|
ctx := context.Background()
|
|
c := newTestCore(withHealthyCode(),
|
|
withInvalidProxyManager())
|
|
resp, err := c.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
|
|
t.Run("normal case", func(t *testing.T) {
|
|
ctx := context.Background()
|
|
c := newTestCore(withHealthyCode(),
|
|
withValidProxyManager())
|
|
resp, err := c.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{})
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
|
})
|
|
}
|
|
|
|
func TestRootCoord_ShowConfigurations(t *testing.T) {
|
|
t.Run("not healthy", func(t *testing.T) {
|
|
ctx := context.Background()
|
|
c := newTestCore(withAbnormalCode())
|
|
resp, err := c.ShowConfigurations(ctx, &internalpb.ShowConfigurationsRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
})
|
|
|
|
t.Run("normal case", func(t *testing.T) {
|
|
Params.InitOnce()
|
|
|
|
pattern := "Port"
|
|
req := &internalpb.ShowConfigurationsRequest{
|
|
Base: &commonpb.MsgBase{
|
|
MsgID: rand.Int63(),
|
|
},
|
|
Pattern: pattern,
|
|
}
|
|
|
|
ctx := context.Background()
|
|
c := newTestCore(withHealthyCode())
|
|
resp, err := c.ShowConfigurations(ctx, req)
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
assert.Equal(t, 1, len(resp.GetConfiguations()))
|
|
assert.Equal(t, "rootcoord.port", resp.GetConfiguations()[0].Key)
|
|
})
|
|
}
|
|
|
|
func TestRootCoord_GetMetrics(t *testing.T) {
|
|
t.Run("not healthy", func(t *testing.T) {
|
|
ctx := context.Background()
|
|
c := newTestCore(withAbnormalCode())
|
|
resp, err := c.GetMetrics(ctx, &milvuspb.GetMetricsRequest{})
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
})
|
|
|
|
t.Run("failed to parse metric type", func(t *testing.T) {
|
|
req := &milvuspb.GetMetricsRequest{
|
|
Request: "invalid request",
|
|
}
|
|
ctx := context.Background()
|
|
c := newTestCore(withHealthyCode())
|
|
resp, err := c.GetMetrics(ctx, req)
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
})
|
|
|
|
t.Run("unsupported metric type", func(t *testing.T) {
|
|
// unsupported metric type
|
|
unsupportedMetricType := "unsupported"
|
|
req, err := metricsinfo.ConstructRequestByMetricType(unsupportedMetricType)
|
|
assert.NoError(t, err)
|
|
ctx := context.Background()
|
|
c := newTestCore(withHealthyCode())
|
|
resp, err := c.GetMetrics(ctx, req)
|
|
assert.NoError(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
})
|
|
|
|
t.Run("normal case", func(t *testing.T) {
|
|
systemInfoMetricType := metricsinfo.SystemInfoMetrics
|
|
req, err := metricsinfo.ConstructRequestByMetricType(systemInfoMetricType)
|
|
assert.NoError(t, err)
|
|
ctx := context.Background()
|
|
c := newTestCore(withHealthyCode(),
|
|
withMetricsCacheManager())
|
|
resp, err := c.GetMetrics(ctx, req)
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
|
})
|
|
|
|
t.Run("get system info metrics from cache", func(t *testing.T) {
|
|
systemInfoMetricType := metricsinfo.SystemInfoMetrics
|
|
req, err := metricsinfo.ConstructRequestByMetricType(systemInfoMetricType)
|
|
assert.NoError(t, err)
|
|
ctx := context.Background()
|
|
c := newTestCore(withHealthyCode(),
|
|
withMetricsCacheManager())
|
|
c.metricsCacheManager.UpdateSystemInfoMetrics(&milvuspb.GetMetricsResponse{
|
|
Status: succStatus(),
|
|
Response: "cached response",
|
|
ComponentName: "cached component",
|
|
})
|
|
resp, err := c.GetMetrics(ctx, req)
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
|
})
|
|
|
|
t.Run("get system info metrics, cache miss", func(t *testing.T) {
|
|
systemInfoMetricType := metricsinfo.SystemInfoMetrics
|
|
req, err := metricsinfo.ConstructRequestByMetricType(systemInfoMetricType)
|
|
assert.NoError(t, err)
|
|
ctx := context.Background()
|
|
c := newTestCore(withHealthyCode(),
|
|
withMetricsCacheManager())
|
|
c.metricsCacheManager.InvalidateSystemInfoMetrics()
|
|
resp, err := c.GetMetrics(ctx, req)
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
|
})
|
|
|
|
t.Run("get system info metrics", func(t *testing.T) {
|
|
systemInfoMetricType := metricsinfo.SystemInfoMetrics
|
|
req, err := metricsinfo.ConstructRequestByMetricType(systemInfoMetricType)
|
|
assert.NoError(t, err)
|
|
ctx := context.Background()
|
|
c := newTestCore(withHealthyCode(),
|
|
withMetricsCacheManager())
|
|
resp, err := c.getSystemInfoMetrics(ctx, req)
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
|
})
|
|
}
|
|
|
|
func TestCore_Import(t *testing.T) {
|
|
|
|
}
|
|
|
|
func TestCore_GetImportState(t *testing.T) {
|
|
|
|
}
|
|
|
|
func TestCore_ListImportTasks(t *testing.T) {
|
|
|
|
}
|
|
|
|
func TestCore_ReportImport(t *testing.T) {
|
|
|
|
}
|
|
|
|
func TestCore_CountCompleteIndex(t *testing.T) {
|
|
|
|
}
|
|
|
|
func TestCore_Rbac(t *testing.T) {
|
|
ctx := context.Background()
|
|
c := &Core{
|
|
ctx: ctx,
|
|
}
|
|
|
|
// not healthy.
|
|
c.stateCode.Store(internalpb.StateCode_Abnormal)
|
|
|
|
{
|
|
resp, err := c.CreateRole(ctx, &milvuspb.CreateRoleRequest{})
|
|
assert.NotNil(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
|
}
|
|
|
|
{
|
|
resp, err := c.DropRole(ctx, &milvuspb.DropRoleRequest{})
|
|
assert.NotNil(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
|
}
|
|
|
|
{
|
|
resp, err := c.OperateUserRole(ctx, &milvuspb.OperateUserRoleRequest{})
|
|
assert.NotNil(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
|
}
|
|
|
|
{
|
|
resp, err := c.SelectRole(ctx, &milvuspb.SelectRoleRequest{})
|
|
assert.NotNil(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
|
}
|
|
|
|
{
|
|
resp, err := c.SelectUser(ctx, &milvuspb.SelectUserRequest{})
|
|
assert.NotNil(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
|
}
|
|
|
|
{
|
|
resp, err := c.OperatePrivilege(ctx, &milvuspb.OperatePrivilegeRequest{})
|
|
assert.NotNil(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
|
}
|
|
|
|
{
|
|
resp, err := c.SelectGrant(ctx, &milvuspb.SelectGrantRequest{})
|
|
assert.NotNil(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
|
}
|
|
|
|
{
|
|
resp, err := c.ListPolicy(ctx, &internalpb.ListPolicyRequest{})
|
|
assert.NotNil(t, err)
|
|
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
|
}
|
|
}
|
|
|
|
func TestCore_sendMinDdlTsAsTt(t *testing.T) {
|
|
ticker := newRocksMqTtSynchronizer()
|
|
ddlManager := newMockDdlTsLockManager()
|
|
ddlManager.GetMinDdlTsFunc = func() Timestamp {
|
|
return 100
|
|
}
|
|
c := newTestCore(
|
|
withTtSynchronizer(ticker),
|
|
withDdlTsLockManager(ddlManager))
|
|
c.sendMinDdlTsAsTt() // no session.
|
|
ticker.addSession(&sessionutil.Session{ServerID: TestRootCoordID})
|
|
c.sendMinDdlTsAsTt()
|
|
}
|
|
|
|
func TestCore_startTimeTickLoop(t *testing.T) {
|
|
ticker := newRocksMqTtSynchronizer()
|
|
ticker.addSession(&sessionutil.Session{ServerID: TestRootCoordID})
|
|
ddlManager := newMockDdlTsLockManager()
|
|
ddlManager.GetMinDdlTsFunc = func() Timestamp {
|
|
return 100
|
|
}
|
|
c := newTestCore(
|
|
withTtSynchronizer(ticker),
|
|
withDdlTsLockManager(ddlManager))
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
c.ctx = ctx
|
|
Params.ProxyCfg.TimeTickInterval = time.Millisecond
|
|
c.wg.Add(1)
|
|
go c.startTimeTickLoop()
|
|
|
|
time.Sleep(time.Millisecond * 4)
|
|
cancel()
|
|
c.wg.Wait()
|
|
}
|