mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 10:08:42 +08:00
Add unittest and fix a bug in segment manager
Signed-off-by: sunby <bingyi.sun@zilliz.com>
This commit is contained in:
parent
859ff62bc9
commit
3afab6ecb9
@ -7,7 +7,6 @@
|
|||||||
"remoteUser": "debugger",
|
"remoteUser": "debugger",
|
||||||
"remoteEnv": {"CCACHE_COMPILERCHECK":"content", "CCACHE_MAXSIZE": "2G", "CCACHE_COMPRESS": "1", "CCACHE_COMPRESSLEVEL": "5"},
|
"remoteEnv": {"CCACHE_COMPILERCHECK":"content", "CCACHE_MAXSIZE": "2G", "CCACHE_COMPRESS": "1", "CCACHE_COMPRESSLEVEL": "5"},
|
||||||
"extensions": [
|
"extensions": [
|
||||||
"ms-vscode.cmake-tools",
|
|
||||||
"ms-vscode.cpptools",
|
"ms-vscode.cpptools",
|
||||||
"golang.go"
|
"golang.go"
|
||||||
]
|
]
|
||||||
|
|||||||
4
.env
4
.env
@ -3,6 +3,6 @@ ARCH=amd64
|
|||||||
UBUNTU=18.04
|
UBUNTU=18.04
|
||||||
DATE_VERSION=20201120-092740
|
DATE_VERSION=20201120-092740
|
||||||
LATEST_DATE_VERSION=latest
|
LATEST_DATE_VERSION=latest
|
||||||
PULSAR_ADDRESS=pulsar://pulsar:6650
|
PULSAR_ADDRESS=pulsar://localhost:6650
|
||||||
ETCD_ADDRESS=etcd:2379
|
ETCD_ADDRESS=localhost:2379
|
||||||
MASTER_ADDRESS=localhost:53100
|
MASTER_ADDRESS=localhost:53100
|
||||||
|
|||||||
2
.github/workflows/main.yaml
vendored
2
.github/workflows/main.yaml
vendored
@ -57,7 +57,7 @@ jobs:
|
|||||||
- name: Start Service
|
- name: Start Service
|
||||||
shell: bash
|
shell: bash
|
||||||
run: |
|
run: |
|
||||||
cd ${GITHUB_WORKSPACE}/deployments/docker && docker-compose -p milvus-distributed up -d
|
cd ${GITHUB_WORKSPACE}/deployments/docker && docker-compose up -d
|
||||||
- name: Build and UnitTest
|
- name: Build and UnitTest
|
||||||
env:
|
env:
|
||||||
CHECK_BUILDER: "1"
|
CHECK_BUILDER: "1"
|
||||||
|
|||||||
@ -8,8 +8,6 @@ services:
|
|||||||
- "2379:2379"
|
- "2379:2379"
|
||||||
- "2380:2380"
|
- "2380:2380"
|
||||||
- "4001:4001"
|
- "4001:4001"
|
||||||
networks:
|
|
||||||
- milvus
|
|
||||||
|
|
||||||
pulsar:
|
pulsar:
|
||||||
image: apachepulsar/pulsar:latest
|
image: apachepulsar/pulsar:latest
|
||||||
@ -17,11 +15,6 @@ services:
|
|||||||
ports:
|
ports:
|
||||||
- "6650:6650"
|
- "6650:6650"
|
||||||
- "18080:8080"
|
- "18080:8080"
|
||||||
networks:
|
|
||||||
- milvus
|
|
||||||
|
|
||||||
networks:
|
|
||||||
milvus:
|
|
||||||
|
|
||||||
# pd0:
|
# pd0:
|
||||||
# image: pingcap/pd:latest
|
# image: pingcap/pd:latest
|
||||||
|
|||||||
@ -10,6 +10,7 @@ x-ccache: &ccache
|
|||||||
services:
|
services:
|
||||||
ubuntu:
|
ubuntu:
|
||||||
image: ${REPO}:${ARCH}-ubuntu${UBUNTU}-${DATE_VERSION}
|
image: ${REPO}:${ARCH}-ubuntu${UBUNTU}-${DATE_VERSION}
|
||||||
|
network_mode: "host"
|
||||||
build:
|
build:
|
||||||
context: .
|
context: .
|
||||||
dockerfile: build/docker/env/cpu/ubuntu${UBUNTU}/Dockerfile
|
dockerfile: build/docker/env/cpu/ubuntu${UBUNTU}/Dockerfile
|
||||||
@ -28,8 +29,6 @@ services:
|
|||||||
command: &ubuntu-command >
|
command: &ubuntu-command >
|
||||||
/bin/bash -c "
|
/bin/bash -c "
|
||||||
make check-proto-product && make verifiers && make unittest"
|
make check-proto-product && make verifiers && make unittest"
|
||||||
networks:
|
|
||||||
- milvus
|
|
||||||
|
|
||||||
gdbserver:
|
gdbserver:
|
||||||
image: ${REPO}:${ARCH}-ubuntu${UBUNTU}-${DATE_VERSION}
|
image: ${REPO}:${ARCH}-ubuntu${UBUNTU}-${DATE_VERSION}
|
||||||
@ -53,8 +52,3 @@ services:
|
|||||||
ports:
|
ports:
|
||||||
- "7776:22"
|
- "7776:22"
|
||||||
- "7777:7777"
|
- "7777:7777"
|
||||||
networks:
|
|
||||||
- milvus
|
|
||||||
|
|
||||||
networks:
|
|
||||||
milvus:
|
|
||||||
|
|||||||
@ -86,15 +86,14 @@ func (segMgr *SegmentManager) closeSegment(segMeta *etcdpb.SegmentMeta) error {
|
|||||||
if segMeta.GetCloseTime() == 0 {
|
if segMeta.GetCloseTime() == 0 {
|
||||||
// close the segment and remove from collStatus
|
// close the segment and remove from collStatus
|
||||||
collStatus, ok := segMgr.collStatus[segMeta.GetCollectionID()]
|
collStatus, ok := segMgr.collStatus[segMeta.GetCollectionID()]
|
||||||
if !ok {
|
if ok {
|
||||||
return errors.Errorf("Can not find the status of collection %d", segMeta.GetCollectionID())
|
|
||||||
}
|
|
||||||
openedSegments := collStatus.openedSegments
|
openedSegments := collStatus.openedSegments
|
||||||
for i, openedSegID := range openedSegments {
|
for i, openedSegID := range openedSegments {
|
||||||
if openedSegID == segMeta.SegmentID {
|
if openedSegID == segMeta.SegmentID {
|
||||||
openedSegments[i] = openedSegments[len(openedSegments)-1]
|
openedSegments[i] = openedSegments[len(openedSegments)-1]
|
||||||
collStatus.openedSegments = openedSegments[:len(openedSegments)-1]
|
collStatus.openedSegments = openedSegments[:len(openedSegments)-1]
|
||||||
return nil
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ts, err := segMgr.globalTSOAllocator()
|
ts, err := segMgr.globalTSOAllocator()
|
||||||
@ -108,7 +107,7 @@ func (segMgr *SegmentManager) closeSegment(segMeta *etcdpb.SegmentMeta) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return errors.Errorf("The segment %d is not opened in collection %d", segMeta.SegmentID, segMeta.GetCollectionID())
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (segMgr *SegmentManager) AssignSegmentID(segIDReq []*internalpb.SegIDRequest) ([]*internalpb.SegIDAssignment, error) {
|
func (segMgr *SegmentManager) AssignSegmentID(segIDReq []*internalpb.SegIDRequest) ([]*internalpb.SegIDAssignment, error) {
|
||||||
|
|||||||
@ -1,11 +1,19 @@
|
|||||||
package master
|
package master
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"log"
|
"log"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/gogo/protobuf/proto"
|
||||||
|
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||||
|
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
|
||||||
|
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
|
||||||
|
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/zilliztech/milvus-distributed/internal/errors"
|
"github.com/zilliztech/milvus-distributed/internal/errors"
|
||||||
"github.com/zilliztech/milvus-distributed/internal/kv"
|
"github.com/zilliztech/milvus-distributed/internal/kv"
|
||||||
@ -23,6 +31,8 @@ var collName = "coll_segmgr_test"
|
|||||||
var collID = int64(1001)
|
var collID = int64(1001)
|
||||||
var partitionTag = "test"
|
var partitionTag = "test"
|
||||||
var kvBase *kv.EtcdKV
|
var kvBase *kv.EtcdKV
|
||||||
|
var master *Master
|
||||||
|
var masterCancelFunc context.CancelFunc
|
||||||
|
|
||||||
func setup() {
|
func setup() {
|
||||||
Params.Init()
|
Params.Init()
|
||||||
@ -32,15 +42,24 @@ func setup() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
rootpath := "/etcd/test/root"
|
rootPath := "/test/root"
|
||||||
kvBase = kv.NewEtcdKV(cli, rootpath)
|
ctx, cancel := context.WithCancel(context.TODO())
|
||||||
|
defer cancel()
|
||||||
|
_, err = cli.Delete(ctx, rootPath, clientv3.WithPrefix())
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
kvBase = kv.NewEtcdKV(cli, rootPath)
|
||||||
tmpMt, err := NewMetaTable(kvBase)
|
tmpMt, err := NewMetaTable(kvBase)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
mt = tmpMt
|
mt = tmpMt
|
||||||
if mt.HasCollection(collID) {
|
if mt.HasCollection(collID) {
|
||||||
mt.DeleteCollection(collID)
|
err := mt.DeleteCollection(collID)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
err = mt.AddCollection(&pb.CollectionMeta{
|
err = mt.AddCollection(&pb.CollectionMeta{
|
||||||
ID: collID,
|
ID: collID,
|
||||||
@ -141,7 +160,7 @@ func TestSegmentManager_AssignSegmentID(t *testing.T) {
|
|||||||
newReqs[0].Count = 1000000
|
newReqs[0].Count = 1000000
|
||||||
_, err = segMgr.AssignSegmentID(newReqs)
|
_, err = segMgr.AssignSegmentID(newReqs)
|
||||||
assert.Error(t, errors.Errorf("request with count %d need about %d mem size which is larger than segment threshold",
|
assert.Error(t, errors.Errorf("request with count %d need about %d mem size which is larger than segment threshold",
|
||||||
1000000, Params.DefaultRecordSize()*1000000), err)
|
1000000, 1024*1000000), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSegmentManager_SegmentStats(t *testing.T) {
|
func TestSegmentManager_SegmentStats(t *testing.T) {
|
||||||
@ -162,7 +181,7 @@ func TestSegmentManager_SegmentStats(t *testing.T) {
|
|||||||
MsgType: internalpb.MsgType_kQueryNodeSegStats,
|
MsgType: internalpb.MsgType_kQueryNodeSegStats,
|
||||||
PeerID: 1,
|
PeerID: 1,
|
||||||
SegStats: []*internalpb.SegmentStats{
|
SegStats: []*internalpb.SegmentStats{
|
||||||
{SegmentID: 100, MemorySize: 25000 * Params.DefaultRecordSize(), NumRows: 25000, RecentlyModified: true},
|
{SegmentID: 100, MemorySize: 2500000, NumRows: 25000, RecentlyModified: true},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
baseMsg := msgstream.BaseMsg{
|
baseMsg := msgstream.BaseMsg{
|
||||||
@ -183,19 +202,159 @@ func TestSegmentManager_SegmentStats(t *testing.T) {
|
|||||||
err = segMgr.HandleQueryNodeMsgPack(&msgPack)
|
err = segMgr.HandleQueryNodeMsgPack(&msgPack)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
segMeta, _ := mt.GetSegmentByID(100)
|
segMeta, _ := mt.GetSegmentByID(100)
|
||||||
assert.Equal(t, int64(100), segMeta.SegmentID)
|
assert.Equal(t, int64(100), segMeta.SegmentID)
|
||||||
assert.Equal(t, 25000*Params.DefaultRecordSize(), segMeta.MemSize)
|
assert.Equal(t, int64(2500000), segMeta.MemSize)
|
||||||
assert.Equal(t, int64(25000), segMeta.NumRows)
|
assert.Equal(t, int64(25000), segMeta.NumRows)
|
||||||
|
|
||||||
// close segment
|
// close segment
|
||||||
stats.SegStats[0].NumRows = 520000
|
stats.SegStats[0].NumRows = 600000
|
||||||
stats.SegStats[0].MemorySize = 520000 * Params.DefaultRecordSize()
|
stats.SegStats[0].MemorySize = 600000000
|
||||||
err = segMgr.HandleQueryNodeMsgPack(&msgPack)
|
err = segMgr.HandleQueryNodeMsgPack(&msgPack)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
segMeta, _ = mt.GetSegmentByID(100)
|
segMeta, _ = mt.GetSegmentByID(100)
|
||||||
assert.Equal(t, int64(100), segMeta.SegmentID)
|
assert.Equal(t, int64(100), segMeta.SegmentID)
|
||||||
assert.NotEqual(t, 0, segMeta.CloseTime)
|
assert.NotEqual(t, uint64(0), segMeta.CloseTime)
|
||||||
|
}
|
||||||
|
|
||||||
|
func startupMaster() {
|
||||||
|
Params.Init()
|
||||||
|
etcdAddress := Params.EtcdAddress()
|
||||||
|
rootPath := "/test/root"
|
||||||
|
ctx, cancel := context.WithCancel(context.TODO())
|
||||||
|
masterCancelFunc = cancel
|
||||||
|
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}})
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
_, err = cli.Delete(ctx, rootPath, clientv3.WithPrefix())
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
opt := &Option{
|
||||||
|
KVRootPath: "/test/root/kv",
|
||||||
|
MetaRootPath: "/test/root/meta",
|
||||||
|
EtcdAddr: []string{etcdAddress},
|
||||||
|
PulsarAddr: "pulsar://localhost:6650",
|
||||||
|
ProxyIDs: []typeutil.UniqueID{1, 2},
|
||||||
|
PulsarProxyChannels: []string{"proxy1", "proxy2"},
|
||||||
|
PulsarProxySubName: "proxyTopics",
|
||||||
|
SoftTTBInterval: 300,
|
||||||
|
WriteIDs: []typeutil.UniqueID{3, 4},
|
||||||
|
PulsarWriteChannels: []string{"write3", "write4"},
|
||||||
|
PulsarWriteSubName: "writeTopics",
|
||||||
|
PulsarDMChannels: []string{"dm0", "dm1"},
|
||||||
|
PulsarK2SChannels: []string{"k2s0", "k2s1"},
|
||||||
|
DefaultRecordSize: 1024,
|
||||||
|
MinimumAssignSize: 1048576,
|
||||||
|
SegmentThreshold: 536870912,
|
||||||
|
SegmentExpireDuration: 2000,
|
||||||
|
NumOfChannel: 5,
|
||||||
|
NumOfQueryNode: 3,
|
||||||
|
StatsChannels: "statistic",
|
||||||
|
}
|
||||||
|
|
||||||
|
master, err = CreateServer(ctx, opt)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
err = master.Run(10013)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func shutdownMaster() {
|
||||||
|
masterCancelFunc()
|
||||||
|
master.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSegmentManager_RPC(t *testing.T) {
|
||||||
|
startupMaster()
|
||||||
|
defer shutdownMaster()
|
||||||
|
ctx, cancel := context.WithCancel(context.TODO())
|
||||||
|
defer cancel()
|
||||||
|
dialContext, err := grpc.DialContext(ctx, "127.0.0.1:10013", grpc.WithInsecure(), grpc.WithBlock())
|
||||||
|
assert.Nil(t, err)
|
||||||
|
defer dialContext.Close()
|
||||||
|
client := masterpb.NewMasterClient(dialContext)
|
||||||
|
schema := &schemapb.CollectionSchema{
|
||||||
|
Name: collName,
|
||||||
|
Description: "test coll",
|
||||||
|
AutoID: false,
|
||||||
|
Fields: []*schemapb.FieldSchema{},
|
||||||
|
}
|
||||||
|
schemaBytes, err := proto.Marshal(schema)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
_, err = client.CreateCollection(ctx, &internalpb.CreateCollectionRequest{
|
||||||
|
MsgType: internalpb.MsgType_kCreateCollection,
|
||||||
|
ReqID: 1,
|
||||||
|
Timestamp: 100,
|
||||||
|
ProxyID: 1,
|
||||||
|
Schema: &commonpb.Blob{Value: schemaBytes},
|
||||||
|
})
|
||||||
|
assert.Nil(t, err)
|
||||||
|
_, err = client.CreatePartition(ctx, &internalpb.CreatePartitionRequest{
|
||||||
|
MsgType: internalpb.MsgType_kCreatePartition,
|
||||||
|
ReqID: 2,
|
||||||
|
Timestamp: 101,
|
||||||
|
ProxyID: 1,
|
||||||
|
PartitionName: &servicepb.PartitionName{
|
||||||
|
CollectionName: collName,
|
||||||
|
Tag: partitionTag,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
resp, err := client.AssignSegmentID(ctx, &internalpb.AssignSegIDRequest{
|
||||||
|
PeerID: 1,
|
||||||
|
Role: internalpb.PeerRole_Proxy,
|
||||||
|
PerChannelReq: []*internalpb.SegIDRequest{
|
||||||
|
{Count: 10000, ChannelID: 0, CollName: collName, PartitionTag: partitionTag},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, commonpb.ErrorCode_SUCCESS, resp.Status.ErrorCode)
|
||||||
|
assignments := resp.GetPerChannelAssignment()
|
||||||
|
assert.Equal(t, 1, len(assignments))
|
||||||
|
assert.Equal(t, collName, assignments[0].CollName)
|
||||||
|
assert.Equal(t, partitionTag, assignments[0].PartitionTag)
|
||||||
|
assert.Equal(t, int32(0), assignments[0].ChannelID)
|
||||||
|
assert.Equal(t, uint32(10000), assignments[0].Count)
|
||||||
|
|
||||||
|
// test stats
|
||||||
|
segID := assignments[0].SegID
|
||||||
|
ms := msgstream.NewPulsarMsgStream(ctx, 1024)
|
||||||
|
ms.SetPulsarClient("pulsar://localhost:6650")
|
||||||
|
ms.CreatePulsarProducers([]string{"statistic"})
|
||||||
|
ms.Start()
|
||||||
|
defer ms.Close()
|
||||||
|
|
||||||
|
err = ms.Produce(&msgstream.MsgPack{
|
||||||
|
BeginTs: 102,
|
||||||
|
EndTs: 104,
|
||||||
|
Msgs: []msgstream.TsMsg{
|
||||||
|
&msgstream.QueryNodeSegStatsMsg{
|
||||||
|
QueryNodeSegStats: internalpb.QueryNodeSegStats{
|
||||||
|
MsgType: internalpb.MsgType_kQueryNodeSegStats,
|
||||||
|
PeerID: 1,
|
||||||
|
SegStats: []*internalpb.SegmentStats{
|
||||||
|
{SegmentID: segID, MemorySize: 600000000, NumRows: 1000000, RecentlyModified: true},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
BaseMsg: msgstream.BaseMsg{
|
||||||
|
HashValues: []int32{0},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
time.Sleep(500 * time.Millisecond)
|
||||||
|
segMeta, err := master.metaTable.GetSegmentByID(segID)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.NotEqual(t, uint64(0), segMeta.GetCloseTime())
|
||||||
|
assert.Equal(t, int64(600000000), segMeta.GetMemSize())
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user