From f360ed700414547703cd40b01df751de840999ab Mon Sep 17 00:00:00 2001 From: sunby Date: Mon, 21 Dec 2020 09:56:35 +0800 Subject: [PATCH] Add unittests Signed-off-by: sunby --- internal/master/segment_assigner_test.go | 139 +++++++++++++++++++++++ internal/master/stats_processor_test.go | 102 +++++++++++++++++ 2 files changed, 241 insertions(+) create mode 100644 internal/master/segment_assigner_test.go create mode 100644 internal/master/stats_processor_test.go diff --git a/internal/master/segment_assigner_test.go b/internal/master/segment_assigner_test.go new file mode 100644 index 0000000000..8d5aef4680 --- /dev/null +++ b/internal/master/segment_assigner_test.go @@ -0,0 +1,139 @@ +package master + +import ( + "context" + "sync/atomic" + "testing" + "time" + + etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" + "go.etcd.io/etcd/clientv3" + + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" + "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" + + "github.com/zilliztech/milvus-distributed/internal/msgstream" + pb "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" + + "github.com/stretchr/testify/assert" + "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" +) + +func TestSegmentManager_AssignSegmentID(t *testing.T) { + Init() + Params.TopicNum = 5 + Params.QueryNodeNum = 3 + Params.SegmentSize = 536870912 / 1024 / 1024 + Params.SegmentSizeFactor = 0.75 + Params.DefaultRecordSize = 1024 + Params.MinSegIDAssignCnt = 1048576 / 1024 + Params.SegIDAssignExpiration = 2000 + collName := "coll_segmgr_test" + collID := int64(1001) + partitionTag := "test" + etcdAddress := Params.EtcdAddress + + var cnt int64 + globalTsoAllocator := func() (Timestamp, error) { + val := atomic.AddInt64(&cnt, 1) + phy := time.Now().UnixNano() / int64(time.Millisecond) + ts := tsoutil.ComposeTS(phy, val) + return ts, nil + } + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}}) + assert.Nil(t, err) + rootPath := "/test/root" + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + _, err = cli.Delete(ctx, rootPath, clientv3.WithPrefix()) + assert.Nil(t, err) + kvBase := etcdkv.NewEtcdKV(cli, rootPath) + defer kvBase.Close() + mt, err := NewMetaTable(kvBase) + assert.Nil(t, err) + err = mt.AddCollection(&pb.CollectionMeta{ + ID: collID, + Schema: &schemapb.CollectionSchema{ + Name: collName, + }, + CreateTime: 0, + SegmentIDs: []UniqueID{}, + PartitionTags: []string{}, + }) + assert.Nil(t, err) + err = mt.AddPartition(collID, partitionTag) + assert.Nil(t, err) + timestamp, err := globalTsoAllocator() + assert.Nil(t, err) + err = mt.AddSegment(&pb.SegmentMeta{ + SegmentID: 100, + CollectionID: collID, + PartitionTag: partitionTag, + ChannelStart: 0, + ChannelEnd: 1, + OpenTime: timestamp, + }) + assert.Nil(t, err) + proxySyncChan := make(chan *msgstream.TimeTickMsg) + + segAssigner := NewSegmentAssigner(ctx, mt, globalTsoAllocator, proxySyncChan) + + segAssigner.Start() + defer segAssigner.Close() + + _, err = segAssigner.Assign(100, 100) + assert.NotNil(t, err) + err = segAssigner.OpenSegment(100, 100000) + assert.Nil(t, err) + result, err := segAssigner.Assign(100, 10000) + assert.Nil(t, err) + assert.True(t, result) + + result, err = segAssigner.Assign(100, 95000) + assert.Nil(t, err) + assert.False(t, result) + + time.Sleep(2 * time.Second) + timestamp, err = globalTsoAllocator() + assert.Nil(t, err) + tickMsg := &msgstream.TimeTickMsg{ + BaseMsg: msgstream.BaseMsg{ + BeginTimestamp: timestamp, EndTimestamp: timestamp, HashValues: []uint32{}, + }, + TimeTickMsg: internalpb.TimeTickMsg{ + MsgType: internalpb.MsgType_kTimeTick, PeerID: 1, Timestamp: timestamp, + }, + } + + proxySyncChan <- tickMsg + time.Sleep(500 * time.Millisecond) + result, err = segAssigner.Assign(100, 100000) + assert.Nil(t, err) + assert.True(t, result) + + err = segAssigner.CloseSegment(100) + assert.Nil(t, err) + _, err = segAssigner.Assign(100, 100) + assert.NotNil(t, err) + + err = mt.AddSegment(&pb.SegmentMeta{ + SegmentID: 200, + CollectionID: collID, + PartitionTag: partitionTag, + ChannelStart: 1, + ChannelEnd: 1, + OpenTime: 100, + NumRows: 10000, + MemSize: 100, + }) + assert.Nil(t, err) + + err = segAssigner.OpenSegment(200, 20000) + assert.Nil(t, err) + result, err = segAssigner.Assign(200, 10001) + assert.Nil(t, err) + assert.False(t, result) + result, err = segAssigner.Assign(200, 10000) + assert.Nil(t, err) + assert.True(t, result) +} diff --git a/internal/master/stats_processor_test.go b/internal/master/stats_processor_test.go new file mode 100644 index 0000000000..a472ee9fcb --- /dev/null +++ b/internal/master/stats_processor_test.go @@ -0,0 +1,102 @@ +package master + +import ( + "context" + "sync/atomic" + "testing" + "time" + + etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" + "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" + "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" + "go.etcd.io/etcd/clientv3" + + "github.com/stretchr/testify/assert" + "github.com/zilliztech/milvus-distributed/internal/msgstream" + + pb "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" +) + +func TestStatsProcess(t *testing.T) { + Init() + etcdAddress := Params.EtcdAddress + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}}) + assert.Nil(t, err) + rootPath := "/test/root" + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + _, err = cli.Delete(ctx, rootPath, clientv3.WithPrefix()) + assert.Nil(t, err) + + kvBase := etcdkv.NewEtcdKV(cli, rootPath) + mt, err := NewMetaTable(kvBase) + assert.Nil(t, err) + + var cnt int64 = 0 + globalTsoAllocator := func() (Timestamp, error) { + val := atomic.AddInt64(&cnt, 1) + phy := time.Now().UnixNano() / int64(time.Millisecond) + ts := tsoutil.ComposeTS(phy, val) + return ts, nil + } + statsProcessor := NewStatsProcessor(mt, globalTsoAllocator) + + ts, err := globalTsoAllocator() + assert.Nil(t, err) + + collID := int64(1001) + collName := "test_coll" + partitionTag := "test_part" + err = mt.AddCollection(&pb.CollectionMeta{ + ID: collID, + Schema: &schemapb.CollectionSchema{ + Name: collName, + }, + CreateTime: 0, + SegmentIDs: []UniqueID{}, + PartitionTags: []string{}, + }) + assert.Nil(t, err) + err = mt.AddPartition(collID, partitionTag) + assert.Nil(t, err) + err = mt.AddSegment(&pb.SegmentMeta{ + SegmentID: 100, + CollectionID: collID, + PartitionTag: partitionTag, + ChannelStart: 0, + ChannelEnd: 1, + OpenTime: ts, + }) + assert.Nil(t, err) + stats := internalpb.QueryNodeStats{ + MsgType: internalpb.MsgType_kQueryNodeStats, + PeerID: 1, + SegStats: []*internalpb.SegmentStats{ + {SegmentID: 100, MemorySize: 2500000, NumRows: 25000, RecentlyModified: true}, + }, + } + baseMsg := msgstream.BaseMsg{ + BeginTimestamp: 0, + EndTimestamp: 0, + HashValues: []uint32{1}, + } + msg := msgstream.QueryNodeStatsMsg{ + QueryNodeStats: stats, + BaseMsg: baseMsg, + } + + var tsMsg msgstream.TsMsg = &msg + msgPack := msgstream.MsgPack{ + Msgs: make([]msgstream.TsMsg, 0), + } + msgPack.Msgs = append(msgPack.Msgs, tsMsg) + err = statsProcessor.ProcessQueryNodeStats(&msgPack) + assert.Nil(t, err) + + segMeta, _ := mt.GetSegmentByID(100) + assert.Equal(t, int64(100), segMeta.SegmentID) + assert.Equal(t, int64(2500000), segMeta.MemSize) + assert.Equal(t, int64(25000), segMeta.NumRows) + +}