milvus/internal/indexnode/indexnode_service_test.go
congqixia 5973f5aeb9
Use go-api/v2 for milvus-proto (#24723)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
2023-06-07 21:32:37 +08:00

407 lines
12 KiB
Go

package indexnode
import (
"context"
"fmt"
"math/rand"
"sync"
"testing"
"time"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/metautil"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/stretchr/testify/assert"
)
func genStorageConfig() *indexpb.StorageConfig {
return &indexpb.StorageConfig{
Address: Params.MinioCfg.Address,
AccessKeyID: Params.MinioCfg.AccessKeyID,
SecretAccessKey: Params.MinioCfg.SecretAccessKey,
BucketName: Params.MinioCfg.BucketName,
RootPath: Params.MinioCfg.RootPath,
IAMEndpoint: Params.MinioCfg.IAMEndpoint,
UseSSL: Params.MinioCfg.UseSSL,
UseIAM: Params.MinioCfg.UseIAM,
}
}
//func TestIndexNodeSimple(t *testing.T) {
// in, err := NewMockIndexNodeComponent(context.TODO())
// assert.Nil(t, err)
// ctx := context.TODO()
// state, err := in.GetComponentStates(ctx)
// assert.Nil(t, err)
// assert.Equal(t, state.Status.ErrorCode, commonpb.ErrorCode_Success)
// assert.Equal(t, state.State.StateCode, commonpb.StateCode_Healthy)
//
// assert.Nil(t, err, err)
// var (
// clusterID = "test-milvus"
// idxFilePrefix = "mock_idx"
// buildID int64 = 1
// collID int64 = 101
// partID int64 = 201
// segID int64 = 301
// idxID int64 = 401
// idxName = "mock_idx"
// vecDim int64 = 8
// typeParams = []*commonpb.KeyValuePair{
// {
// Key: "dim",
// Value: fmt.Sprintf("%d", vecDim),
// },
// }
// indexParams = []*commonpb.KeyValuePair{
// {
// Key: "metric_type",
// Value: "L2",
// },
// {
// Key: "index_type",
// Value: "IVF_FLAT",
// },
// {
// Key: "nlist",
// Value: "128",
// },
// }
// mockChunkMgr = mockChunkMgrIns
// )
//
// mockChunkMgr.mockFieldData(1000, dim, collID, partID, segID)
// t.Run("create job", func(t *testing.T) {
// createReq := &indexpb.CreateJobRequest{
// ClusterID: clusterID,
// IndexFilePrefix: idxFilePrefix,
// BuildID: buildID,
// DataPaths: []string{dataPath(collID, partID, segID)},
// IndexVersion: 0,
// IndexID: idxID,
// IndexName: idxName,
// IndexParams: indexParams,
// TypeParams: typeParams,
// StorageConfig: genStorageConfig(),
// }
// status, err := in.CreateJob(ctx, createReq)
// assert.Nil(t, err)
// assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_Success)
// })
//
// t.Run(("query job"), func(t *testing.T) {
// queryJob := &indexpb.QueryJobsRequest{
// ClusterID: clusterID,
// BuildIDs: []int64{buildID},
// }
// timeout := time.After(time.Second * 10)
// var idxInfo *indexpb.IndexTaskInfo
// Loop:
// for {
// select {
// case <-timeout:
// t.Fatal("timeout for querying jobs")
// default:
// time.Sleep(1 * time.Millisecond)
// resp, err := in.QueryJobs(ctx, queryJob)
// assert.Nil(t, err)
// assert.Equal(t, resp.Status.ErrorCode, commonpb.ErrorCode_Success)
// assert.Equal(t, resp.ClusterID, clusterID)
//
// for _, indexInfo := range resp.IndexInfos {
// if indexInfo.BuildID == buildID {
// if indexInfo.State == commonpb.IndexState_Finished {
// idxInfo = indexInfo
// break Loop
// }
// }
// }
//
// }
// }
//
// assert.NotNil(t, idxInfo)
// for _, idxFileID := range idxInfo.IndexFileKeys {
// idxFile := metautil.BuildSegmentIndexFilePath(mockChunkMgr.RootPath(), buildID, 0,
// partID, segID, idxFileID)
// _, ok := mockChunkMgr.indexedData.Load(idxFile)
// assert.True(t, ok)
// t.Logf("indexed file: %s", idxFile)
// }
//
// jobNumRet, err := in.GetJobStats(ctx, &indexpb.GetJobStatsRequest{})
// assert.Nil(t, err)
// assert.Equal(t, jobNumRet.Status.GetErrorCode(), commonpb.ErrorCode_Success)
// assert.Equal(t, jobNumRet.TotalJobNum, int64(0))
// assert.Equal(t, jobNumRet.InProgressJobNum, int64(0))
// assert.Equal(t, jobNumRet.EnqueueJobNum, int64(0))
// assert.Equal(t, jobNumRet.TaskSlots, int64(1))
// assert.Equal(t, len(jobNumRet.JobInfos), 1)
// jobInfo := jobNumRet.JobInfos[0]
//
// assert.True(t, jobInfo.Dim == 8)
// assert.True(t, jobInfo.NumRows == 1000)
// assert.True(t, jobInfo.PodID == 1)
// assert.ElementsMatch(t, jobInfo.IndexParams, indexParams)
// })
//
// t.Run("drop not exists jobs", func(t *testing.T) {
// status, err := in.DropJobs(ctx, &indexpb.DropJobsRequest{
// ClusterID: clusterID,
// BuildIDs: []int64{100001},
// })
// assert.Nil(t, err)
// assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_Success)
// })
//}
type testTask struct {
buildID int64
collID int64
partID int64
segID int64
idxID int64
dim int
rownum int
typeParams []*commonpb.KeyValuePair
idxParams []*commonpb.KeyValuePair
}
func TestIndexNodeComplex(t *testing.T) {
var (
clusterID string
buildID0 int64
collID0 int64 = 10000
partID0 int64 = 20000
segID0 int64 = 30000
idxID0 int64 = 40000
typesParamsLists = [][]*commonpb.KeyValuePair{
{{
Key: "dim",
Value: fmt.Sprintf("%d", 8),
}},
{{
Key: "dim",
Value: fmt.Sprintf("%d", 16),
}},
{{
Key: "dim",
Value: fmt.Sprintf("%d", 32),
}},
}
rowNums = []int{100, 1000, 10000}
dims = []int{8, 16, 32}
indexParams = []*commonpb.KeyValuePair{
{
Key: "nlist",
Value: "128",
},
{
Key: "metric_type",
Value: "L2",
},
{
Key: "index_type",
Value: "IVF_FLAT",
},
}
)
in, err := NewMockIndexNodeComponent(context.TODO())
assert.Nil(t, err)
ctx := context.TODO()
state, err := in.GetComponentStates(ctx)
assert.Nil(t, err)
assert.Equal(t, state.Status.ErrorCode, commonpb.ErrorCode_Success)
assert.Equal(t, state.State.StateCode, commonpb.StateCode_Healthy)
mockChunkMgr := mockChunkMgrIns
tasks := make([]*testTask, 0)
var i int64
t.Logf("preparing mock data...")
wg := sync.WaitGroup{}
for i = 0; i < 10; i++ {
task := &testTask{
buildID: i + buildID0,
collID: i + collID0,
partID: i + partID0,
segID: i + segID0,
idxID: i + idxID0,
typeParams: typesParamsLists[i%3],
dim: dims[i%3],
rownum: rowNums[i%3],
idxParams: indexParams,
}
wg.Add(1)
go func() {
defer wg.Done()
if rand.Float32() < 0.5 {
mockChunkMgr.mockFieldData(task.rownum, task.dim, task.collID, task.partID, task.segID)
}
}()
tasks = append(tasks, task)
}
wg.Wait()
t.Logf("start concurent testing")
testwg := sync.WaitGroup{}
for i := 0; i < len(tasks); i++ {
req := &indexpb.CreateJobRequest{
ClusterID: clusterID,
IndexFilePrefix: "mock_idx",
BuildID: tasks[i].buildID,
DataPaths: []string{dataPath(tasks[i].collID, tasks[i].partID, tasks[i].segID)},
IndexVersion: 0,
IndexID: tasks[i].idxID,
IndexName: fmt.Sprintf("idx%d", tasks[i].idxID),
IndexParams: tasks[i].idxParams,
TypeParams: tasks[i].typeParams,
StorageConfig: genStorageConfig(),
}
testwg.Add(1)
go func() {
defer testwg.Done()
status, err := in.CreateJob(ctx, req)
assert.Nil(t, err)
assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_Success)
}()
testwg.Add(1)
go func(idx int) {
defer testwg.Done()
if rand.Float32() < 0.5 {
status, err := in.DropJobs(ctx, &indexpb.DropJobsRequest{
ClusterID: clusterID,
BuildIDs: []int64{tasks[idx].buildID},
})
assert.Nil(t, err)
assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_Success)
}
}(i)
}
testwg.Wait()
timeout := time.After(time.Second * 30)
Loop:
for {
select {
case <-timeout:
t.Fatal("timeout testing")
default:
jobNumRet, err := in.GetJobStats(ctx, &indexpb.GetJobStatsRequest{})
assert.Nil(t, err)
assert.Equal(t, jobNumRet.Status.ErrorCode, commonpb.ErrorCode_Success)
if jobNumRet.TotalJobNum == 0 {
break Loop
}
time.Sleep(time.Second)
}
}
buildIDs := make([]int64, 0, len(tasks))
for _, task := range tasks {
buildIDs = append(buildIDs, task.buildID)
}
jobresp, err := in.QueryJobs(ctx, &indexpb.QueryJobsRequest{
ClusterID: clusterID,
BuildIDs: buildIDs,
})
assert.Nil(t, err)
assert.Equal(t, jobresp.Status.ErrorCode, jobresp.Status.ErrorCode)
for _, job := range jobresp.IndexInfos {
task := tasks[job.BuildID-buildID0]
if job.State == commonpb.IndexState_Finished {
for _, idxFileID := range job.IndexFileKeys {
idxFile := metautil.BuildSegmentIndexFilePath(mockChunkMgr.RootPath(), task.buildID,
0, task.partID, task.segID, idxFileID)
_, ok := mockChunkMgr.indexedData.Load(idxFile)
assert.True(t, ok)
}
t.Logf("buildID: %d, indexFiles: %v", job.BuildID, job.IndexFileKeys)
} else {
_, ok := mockChunkMgr.indexedData.Load(dataPath(task.collID, task.partID, task.segID))
assert.False(t, ok)
}
}
// stop indexnode
assert.Nil(t, in.Stop())
node := in.(*mockIndexNodeComponent).IndexNode
assert.Equal(t, 0, len(node.tasks))
assert.Equal(t, commonpb.StateCode_Abnormal, node.stateCode.Load().(commonpb.StateCode))
}
func TestAbnormalIndexNode(t *testing.T) {
in, err := NewMockIndexNodeComponent(context.TODO())
assert.Nil(t, err)
assert.Nil(t, in.Stop())
ctx := context.TODO()
status, err := in.CreateJob(ctx, &indexpb.CreateJobRequest{})
assert.Nil(t, err)
assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_NotReadyServe)
qresp, err := in.QueryJobs(ctx, &indexpb.QueryJobsRequest{})
assert.Nil(t, err)
assert.Equal(t, qresp.Status.ErrorCode, commonpb.ErrorCode_NotReadyServe)
status, err = in.DropJobs(ctx, &indexpb.DropJobsRequest{})
assert.Nil(t, err)
assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_NotReadyServe)
jobNumRsp, err := in.GetJobStats(ctx, &indexpb.GetJobStatsRequest{})
assert.Nil(t, err)
assert.Equal(t, jobNumRsp.Status.ErrorCode, commonpb.ErrorCode_NotReadyServe)
metricsResp, err := in.GetMetrics(ctx, &milvuspb.GetMetricsRequest{})
assert.Nil(t, err)
assert.Equal(t, metricsResp.Status.ErrorCode, commonpb.ErrorCode_NotReadyServe)
configurationResp, err := in.ShowConfigurations(ctx, &internalpb.ShowConfigurationsRequest{})
assert.Nil(t, err)
assert.Equal(t, configurationResp.Status.ErrorCode, commonpb.ErrorCode_NotReadyServe)
}
func TestGetMetrics(t *testing.T) {
var (
ctx = context.TODO()
metricReq, _ = metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
)
in, err := NewMockIndexNodeComponent(ctx)
assert.Nil(t, err)
resp, err := in.GetMetrics(ctx, metricReq)
assert.Nil(t, err)
assert.Equal(t, resp.Status.ErrorCode, commonpb.ErrorCode_Success)
t.Logf("Component: %s, Metrics: %s", resp.ComponentName, resp.Response)
}
func TestGetMetricsError(t *testing.T) {
var (
ctx = context.TODO()
)
in, err := NewMockIndexNodeComponent(ctx)
assert.Nil(t, err)
errReq := &milvuspb.GetMetricsRequest{
Request: `{"metric_typ": "system_info"}`,
}
resp, err := in.GetMetrics(ctx, errReq)
assert.Nil(t, err)
assert.Equal(t, resp.Status.ErrorCode, commonpb.ErrorCode_UnexpectedError)
unsupportedReq := &milvuspb.GetMetricsRequest{
Request: `{"metric_type": "application_info"}`,
}
resp, err = in.GetMetrics(ctx, unsupportedReq)
assert.Nil(t, err)
assert.Equal(t, resp.Status.ErrorCode, commonpb.ErrorCode_UnexpectedError)
assert.Equal(t, resp.Status.Reason, metricsinfo.MsgUnimplementedMetric)
}
func TestMockFieldData(t *testing.T) {
chunkMgr := NewMockChunkManager()
chunkMgr.mockFieldData(100000, 8, 0, 0, 1)
}