From b4bd9cf9d65aa2a25b8cda731ac339d26491e760 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Wed, 11 Nov 2020 11:22:19 +0800 Subject: [PATCH] Add collection, segment, and partition unittest Signed-off-by: bigsheeper --- go.mod | 1 - go.sum | 2 + internal/master/collection/collection.go | 2 +- internal/reader/col_seg_container.go | 18 +- internal/reader/col_seg_container_test.go | 675 ++++++++++++ internal/reader/collection_test.go | 194 +++- internal/reader/meta_service.go | 6 +- internal/reader/partition_test.go | 141 ++- internal/reader/segment.go | 2 +- internal/reader/segment_test.go | 1136 +++++++++++++-------- 10 files changed, 1631 insertions(+), 546 deletions(-) create mode 100644 internal/reader/col_seg_container_test.go diff --git a/go.mod b/go.mod index d66b97adf5..1eb6a53930 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,6 @@ require ( github.com/coreos/etcd v3.3.25+incompatible // indirect github.com/frankban/quicktest v1.10.2 // indirect github.com/fsnotify/fsnotify v1.4.9 // indirect - github.com/gogo/protobuf v1.3.1 github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect github.com/golang/protobuf v1.3.2 github.com/google/btree v1.0.0 diff --git a/go.sum b/go.sum index 5556e6f74d..6084003359 100644 --- a/go.sum +++ b/go.sum @@ -68,6 +68,7 @@ github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LB github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v0.0.0-20180717141946-636bf0302bc9/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls= github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= @@ -242,6 +243,7 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/procfs v0.1.3 h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFBS8= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= +github.com/protocolbuffers/protobuf v3.13.0+incompatible h1:omZA3Tuq+U2kJ2uMuqMR9c1VO5qLEgZ19m9878fXNtg= github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446/go.mod h1:uYEyJGbgTkfkS4+E/PavXkNJcbFIpEtjt2B0KDQ5+9M= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= diff --git a/internal/master/collection/collection.go b/internal/master/collection/collection.go index 097b4ae8b9..281c32299a 100644 --- a/internal/master/collection/collection.go +++ b/internal/master/collection/collection.go @@ -5,7 +5,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/util/typeutil" - "github.com/gogo/protobuf/proto" + "github.com/golang/protobuf/proto" jsoniter "github.com/json-iterator/go" "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" diff --git a/internal/reader/col_seg_container.go b/internal/reader/col_seg_container.go index bfb0f05b06..c71150bd57 100644 --- a/internal/reader/col_seg_container.go +++ b/internal/reader/col_seg_container.go @@ -77,9 +77,9 @@ func (container *ColSegContainer) getCollectionByName(collectionName string) (*C } //----------------------------------------------------------------------------------------------------- partition -func (container *ColSegContainer) addPartition(collection *Collection, partitionTag string) error { +func (container *ColSegContainer) addPartition(collection *Collection, partitionTag string) (*Partition, error) { if collection == nil { - return errors.New("null collection") + return nil, errors.New("null collection") } var newPartition = newPartition(partitionTag) @@ -87,11 +87,11 @@ func (container *ColSegContainer) addPartition(collection *Collection, partition for _, col := range container.collections { if col.Name() == collection.Name() { *col.Partitions() = append(*col.Partitions(), newPartition) - return nil + return newPartition, nil } } - return errors.New("cannot find collection, name = " + collection.Name()) + return nil, errors.New("cannot find collection, name = " + collection.Name()) } func (container *ColSegContainer) removePartition(partition *Partition) error { @@ -138,13 +138,13 @@ func (container *ColSegContainer) getPartitionByTag(partitionTag string) (*Parti } //----------------------------------------------------------------------------------------------------- segment -func (container *ColSegContainer) addSegment(collection *Collection, partition *Partition, segmentID int64) error { +func (container *ColSegContainer) addSegment(collection *Collection, partition *Partition, segmentID int64) (*Segment, error) { if collection == nil { - return errors.New("null collection") + return nil, errors.New("null collection") } if partition == nil { - return errors.New("null partition") + return nil, errors.New("null partition") } var newSegment = newSegment(collection, segmentID) @@ -155,13 +155,13 @@ func (container *ColSegContainer) addSegment(collection *Collection, partition * for _, p := range *col.Partitions() { if p.Tag() == partition.Tag() { *p.Segments() = append(*p.Segments(), newSegment) - return nil + return newSegment, nil } } } } - return errors.New("cannot find collection or segment") + return nil, errors.New("cannot find collection or segment") } func (container *ColSegContainer) removeSegment(segment *Segment) error { diff --git a/internal/reader/col_seg_container_test.go b/internal/reader/col_seg_container_test.go new file mode 100644 index 0000000000..014d43fc7d --- /dev/null +++ b/internal/reader/col_seg_container_test.go @@ -0,0 +1,675 @@ +package reader + +import ( + "context" + "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/assert" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" + "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" + "testing" +) + +//----------------------------------------------------------------------------------------------------- collection +func TestColSegContainer_addCollection(t *testing.T) { + ctx := context.Background() + pulsarUrl := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarUrl) + + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + Id: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIds: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + var collection = node.container.addCollection(&collectionMeta, collectionMetaBlob) + + assert.Equal(t, collection.meta.Schema.Name, "collection0") + assert.Equal(t, collection.meta.Id, UniqueID(0)) + assert.Equal(t, len(node.container.collections), 1) +} + +func TestColSegContainer_removeCollection(t *testing.T) { + ctx := context.Background() + pulsarUrl := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarUrl) + + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + Id: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIds: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + var collection = node.container.addCollection(&collectionMeta, collectionMetaBlob) + + assert.Equal(t, collection.meta.Schema.Name, "collection0") + assert.Equal(t, collection.meta.Id, UniqueID(0)) + assert.Equal(t, len(node.container.collections), 1) + + err := node.container.removeCollection(collection) + assert.NoError(t, err) + assert.Equal(t, len(node.container.collections), 0) +} + +func TestColSegContainer_getCollectionByID(t *testing.T) { + ctx := context.Background() + pulsarUrl := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarUrl) + + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + Id: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIds: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + var collection = node.container.addCollection(&collectionMeta, collectionMetaBlob) + + assert.Equal(t, collection.meta.Schema.Name, "collection0") + assert.Equal(t, collection.meta.Id, UniqueID(0)) + assert.Equal(t, len(node.container.collections), 1) + + targetCollection, err := node.container.getCollectionByID(UniqueID(0)) + assert.NoError(t, err) + assert.NotNil(t, targetCollection) + assert.Equal(t, targetCollection.meta.Schema.Name, "collection0") + assert.Equal(t, targetCollection.meta.Id, UniqueID(0)) +} + +func TestColSegContainer_getCollectionByName(t *testing.T) { + ctx := context.Background() + pulsarUrl := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarUrl) + + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + Id: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIds: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + var collection = node.container.addCollection(&collectionMeta, collectionMetaBlob) + + assert.Equal(t, collection.meta.Schema.Name, "collection0") + assert.Equal(t, collection.meta.Id, UniqueID(0)) + assert.Equal(t, len(node.container.collections), 1) + + targetCollection, err := node.container.getCollectionByName("collection0") + assert.NoError(t, err) + assert.NotNil(t, targetCollection) + assert.Equal(t, targetCollection.meta.Schema.Name, "collection0") + assert.Equal(t, targetCollection.meta.Id, UniqueID(0)) +} + +//----------------------------------------------------------------------------------------------------- partition +func TestColSegContainer_addPartition(t *testing.T) { + ctx := context.Background() + pulsarUrl := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarUrl) + + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + Id: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIds: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + var collection = node.container.addCollection(&collectionMeta, collectionMetaBlob) + + assert.Equal(t, collection.meta.Schema.Name, "collection0") + assert.Equal(t, collection.meta.Id, UniqueID(0)) + assert.Equal(t, len(node.container.collections), 1) + + for _, tag := range collectionMeta.PartitionTags { + targetPartition, err := node.container.addPartition(collection, tag) + assert.NoError(t, err) + assert.Equal(t, targetPartition.partitionTag, "default") + } +} + +func TestColSegContainer_removePartition(t *testing.T) { + ctx := context.Background() + pulsarUrl := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarUrl) + + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + Id: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIds: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + var collection = node.container.addCollection(&collectionMeta, collectionMetaBlob) + + assert.Equal(t, collection.meta.Schema.Name, "collection0") + assert.Equal(t, collection.meta.Id, UniqueID(0)) + assert.Equal(t, len(node.container.collections), 1) + + for _, tag := range collectionMeta.PartitionTags { + targetPartition, err := node.container.addPartition(collection, tag) + assert.NoError(t, err) + assert.Equal(t, targetPartition.partitionTag, "default") + err = node.container.removePartition(targetPartition) + assert.NoError(t, err) + } +} + +func TestColSegContainer_getPartitionByTag(t *testing.T) { + ctx := context.Background() + pulsarUrl := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarUrl) + + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + Id: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIds: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + var collection = node.container.addCollection(&collectionMeta, collectionMetaBlob) + + assert.Equal(t, collection.meta.Schema.Name, "collection0") + assert.Equal(t, collection.meta.Id, UniqueID(0)) + assert.Equal(t, len(node.container.collections), 1) + + for _, tag := range collectionMeta.PartitionTags { + targetPartition, err := node.container.addPartition(collection, tag) + assert.NoError(t, err) + assert.Equal(t, targetPartition.partitionTag, "default") + partition, err := node.container.getPartitionByTag(tag) + assert.NoError(t, err) + assert.NotNil(t, partition) + assert.Equal(t, partition.partitionTag, "default") + } +} + +//----------------------------------------------------------------------------------------------------- segment +func TestColSegContainer_addSegment(t *testing.T) { + ctx := context.Background() + pulsarUrl := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarUrl) + + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + Id: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIds: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + var collection = node.container.addCollection(&collectionMeta, collectionMetaBlob) + + assert.Equal(t, collection.meta.Schema.Name, "collection0") + assert.Equal(t, collection.meta.Id, UniqueID(0)) + assert.Equal(t, len(node.container.collections), 1) + + partition, err := node.container.addPartition(collection, collectionMeta.PartitionTags[0]) + assert.NoError(t, err) + + const segmentNum = 3 + for i := 0; i < segmentNum; i++ { + targetSeg, err := node.container.addSegment(collection, partition, UniqueID(i)) + assert.NoError(t, err) + assert.Equal(t, targetSeg.segmentID, UniqueID(i)) + } +} + +func TestColSegContainer_removeSegment(t *testing.T) { + ctx := context.Background() + pulsarUrl := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarUrl) + + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + Id: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIds: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + var collection = node.container.addCollection(&collectionMeta, collectionMetaBlob) + + assert.Equal(t, collection.meta.Schema.Name, "collection0") + assert.Equal(t, collection.meta.Id, UniqueID(0)) + assert.Equal(t, len(node.container.collections), 1) + + partition, err := node.container.addPartition(collection, collectionMeta.PartitionTags[0]) + assert.NoError(t, err) + + const segmentNum = 3 + for i := 0; i < segmentNum; i++ { + targetSeg, err := node.container.addSegment(collection, partition, UniqueID(i)) + assert.NoError(t, err) + assert.Equal(t, targetSeg.segmentID, UniqueID(i)) + err = node.container.removeSegment(targetSeg) + assert.NoError(t, err) + } +} + +func TestColSegContainer_getSegmentByID(t *testing.T) { + ctx := context.Background() + pulsarUrl := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarUrl) + + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + Id: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIds: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + var collection = node.container.addCollection(&collectionMeta, collectionMetaBlob) + + assert.Equal(t, collection.meta.Schema.Name, "collection0") + assert.Equal(t, collection.meta.Id, UniqueID(0)) + assert.Equal(t, len(node.container.collections), 1) + + partition, err := node.container.addPartition(collection, collectionMeta.PartitionTags[0]) + assert.NoError(t, err) + + const segmentNum = 3 + for i := 0; i < segmentNum; i++ { + targetSeg, err := node.container.addSegment(collection, partition, UniqueID(i)) + assert.NoError(t, err) + assert.Equal(t, targetSeg.segmentID, UniqueID(i)) + seg, err := node.container.getSegmentByID(UniqueID(i)) + assert.NoError(t, err) + assert.Equal(t, seg.segmentID, UniqueID(i)) + } +} + +func TestColSegContainer_hasSegment(t *testing.T) { + ctx := context.Background() + pulsarUrl := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarUrl) + + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + Id: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIds: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + var collection = node.container.addCollection(&collectionMeta, collectionMetaBlob) + + assert.Equal(t, collection.meta.Schema.Name, "collection0") + assert.Equal(t, collection.meta.Id, UniqueID(0)) + assert.Equal(t, len(node.container.collections), 1) + + partition, err := node.container.addPartition(collection, collectionMeta.PartitionTags[0]) + assert.NoError(t, err) + + const segmentNum = 3 + for i := 0; i < segmentNum; i++ { + targetSeg, err := node.container.addSegment(collection, partition, UniqueID(i)) + assert.NoError(t, err) + assert.Equal(t, targetSeg.segmentID, UniqueID(i)) + hasSeg := node.container.hasSegment(UniqueID(i)) + assert.Equal(t, hasSeg, true) + hasSeg = node.container.hasSegment(UniqueID(i + 100)) + assert.Equal(t, hasSeg, false) + } +} diff --git a/internal/reader/collection_test.go b/internal/reader/collection_test.go index 24b7cbe2cb..e05964b867 100644 --- a/internal/reader/collection_test.go +++ b/internal/reader/collection_test.go @@ -1,33 +1,165 @@ package reader -//func TestCollection_NewPartition(t *testing.T) { -// ctx := context.Background() -// pulsarUrl := "pulsar://localhost:6650" -// node := NewQueryNode(ctx, 0, pulsarUrl) -// -// var collection = node.newCollection(0, "collection0", "") -// var partition = collection.newPartition("partition0") -// -// assert.Equal(t, collection.CollectionName, "collection0") -// assert.Equal(t, collection.CollectionID, int64(0)) -// assert.Equal(t, partition.partitionTag, "partition0") -// assert.Equal(t, len(collection.Partitions), 1) -//} -// -//func TestCollection_DeletePartition(t *testing.T) { -// ctx := context.Background() -// pulsarUrl := "pulsar://localhost:6650" -// node := NewQueryNode(ctx, 0, pulsarUrl) -// -// var collection = node.newCollection(0, "collection0", "") -// var partition = collection.newPartition("partition0") -// -// assert.Equal(t, collection.CollectionName, "collection0") -// assert.Equal(t, collection.CollectionID, int64(0)) -// assert.Equal(t, partition.partitionTag, "partition0") -// assert.Equal(t, len(collection.Partitions), 1) -// -// collection.deletePartition(node, partition) -// -// assert.Equal(t, len(collection.Partitions), 0) -//} +import ( + "context" + "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/assert" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" + "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" + "testing" +) + +func TestCollection_Partitions(t *testing.T) { + ctx := context.Background() + pulsarUrl := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarUrl) + + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + Id: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIds: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + var collection = node.container.addCollection(&collectionMeta, collectionMetaBlob) + + assert.Equal(t, collection.meta.Schema.Name, "collection0") + assert.Equal(t, collection.meta.Id, UniqueID(0)) + assert.Equal(t, len(node.container.collections), 1) + + for _, tag := range collectionMeta.PartitionTags { + _, err := node.container.addPartition(collection, tag) + assert.NoError(t, err) + } + + partitions := collection.Partitions() + assert.Equal(t, len(collectionMeta.PartitionTags), len(*partitions)) +} + +func TestCollection_newCollection(t *testing.T) { + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + Id: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIds: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + collection := newCollection(&collectionMeta, collectionMetaBlob) + assert.Equal(t, collection.meta.Schema.Name, "collection0") + assert.Equal(t, collection.meta.Id, UniqueID(0)) +} + +func TestCollection_deleteCollection(t *testing.T) { + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + Id: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIds: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + collection := newCollection(&collectionMeta, collectionMetaBlob) + assert.Equal(t, collection.meta.Schema.Name, "collection0") + assert.Equal(t, collection.meta.Id, UniqueID(0)) + + deleteCollection(collection) +} diff --git a/internal/reader/meta_service.go b/internal/reader/meta_service.go index 9a70ffc502..a9f99bac8f 100644 --- a/internal/reader/meta_service.go +++ b/internal/reader/meta_service.go @@ -3,7 +3,7 @@ package reader import ( "context" "fmt" - "github.com/gogo/protobuf/proto" + "github.com/golang/protobuf/proto" "log" "path" "reflect" @@ -144,7 +144,7 @@ func (mService *metaService) processCollectionCreate(id string, value string) { if col != nil { newCollection := mService.container.addCollection(col, value) for _, partitionTag := range col.PartitionTags { - err := mService.container.addPartition(newCollection, partitionTag) + _, err := mService.container.addPartition(newCollection, partitionTag) if err != nil { log.Println(err) } @@ -174,7 +174,7 @@ func (mService *metaService) processSegmentCreate(id string, value string) { return } if partition != nil { - err = mService.container.addSegment(col, partition, seg.SegmentId) + _, err = mService.container.addSegment(col, partition, seg.SegmentId) if err != nil { log.Println(err) return diff --git a/internal/reader/partition_test.go b/internal/reader/partition_test.go index f9268cdecf..4e68962e6c 100644 --- a/internal/reader/partition_test.go +++ b/internal/reader/partition_test.go @@ -1,57 +1,88 @@ package reader -//func TestPartition_NewSegment(t *testing.T) { -// ctx := context.Background() -// pulsarUrl := "pulsar://localhost:6650" -// node := NewQueryNode(ctx, 0, pulsarUrl) -// -// var collection = node.newCollection(0, "collection0", "") -// var partition = collection.newPartition("partition0") -// -// var segment = partition.newSegment(0) -// node.SegmentsMap[int64(0)] = segment -// -// assert.Equal(t, collection.CollectionName, "collection0") -// assert.Equal(t, collection.CollectionID, int64(0)) -// assert.Equal(t, partition.partitionTag, "partition0") -// assert.Equal(t, node.Collections[0].Partitions[0].Segments[0].SegmentID, int64(0)) -// -// assert.Equal(t, len(collection.Partitions), 1) -// assert.Equal(t, len(node.Collections), 1) -// assert.Equal(t, len(node.Collections[0].Partitions[0].Segments), 1) -// -// assert.Equal(t, segment.SegmentID, int64(0)) -// assert.Equal(t, node.foundSegmentBySegmentID(int64(0)), true) -//} -// -//func TestPartition_DeleteSegment(t *testing.T) { -// // 1. Construct node, collection, partition and segment -// ctx := context.Background() -// pulsarUrl := "pulsar://localhost:6650" -// node := NewQueryNode(ctx, 0, pulsarUrl) -// -// var collection = node.newCollection(0, "collection0", "") -// var partition = collection.newPartition("partition0") -// -// var segment = partition.newSegment(0) -// node.SegmentsMap[int64(0)] = segment -// -// assert.Equal(t, collection.CollectionName, "collection0") -// assert.Equal(t, collection.CollectionID, int64(0)) -// assert.Equal(t, partition.partitionTag, "partition0") -// assert.Equal(t, node.Collections[0].Partitions[0].Segments[0].SegmentID, int64(0)) -// -// assert.Equal(t, len(collection.Partitions), 1) -// assert.Equal(t, len(node.Collections), 1) -// assert.Equal(t, len(node.Collections[0].Partitions[0].Segments), 1) -// -// assert.Equal(t, segment.SegmentID, int64(0)) -// -// // 2. Destruct collection, partition and segment -// partition.deleteSegment(node, segment) -// -// assert.Equal(t, len(collection.Partitions), 1) -// assert.Equal(t, len(node.Collections), 1) -// assert.Equal(t, len(node.Collections[0].Partitions[0].Segments), 0) -// assert.Equal(t, node.foundSegmentBySegmentID(int64(0)), false) -//} +import ( + "context" + "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/assert" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" + "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" + "testing" +) + +func TestPartition_Segments(t *testing.T) { + ctx := context.Background() + pulsarUrl := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarUrl) + + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + Id: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIds: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + var collection = node.container.addCollection(&collectionMeta, collectionMetaBlob) + + assert.Equal(t, collection.meta.Schema.Name, "collection0") + assert.Equal(t, collection.meta.Id, UniqueID(0)) + assert.Equal(t, len(node.container.collections), 1) + + for _, tag := range collectionMeta.PartitionTags { + _, err := node.container.addPartition(collection, tag) + assert.NoError(t, err) + } + + partitions := collection.Partitions() + assert.Equal(t, len(collectionMeta.PartitionTags), len(*partitions)) + + targetPartition := (*partitions)[0] + + const segmentNum = 3 + for i:= 0; i < segmentNum; i++ { + _, err := node.container.addSegment(collection, targetPartition, UniqueID(i)) + assert.NoError(t, err) + } + + segments := targetPartition.Segments() + assert.Equal(t, segmentNum, len(*segments)) +} + +func TestPartition_newPartition(t *testing.T) { + partitionTag := "default" + partition := newPartition(partitionTag) + assert.Equal(t, partition.partitionTag, partitionTag) +} diff --git a/internal/reader/segment.go b/internal/reader/segment.go index 7c67fac8fb..88965482ee 100644 --- a/internal/reader/segment.go +++ b/internal/reader/segment.go @@ -84,7 +84,7 @@ func (s *Segment) getMemSize() int64 { return int64(memoryUsageInBytes) } -//-------------------------------------------------------------------------------------- preprocess functions +//-------------------------------------------------------------------------------------- preDm functions func (s *Segment) segmentPreInsert(numOfRecords int) int64 { /* long int diff --git a/internal/reader/segment_test.go b/internal/reader/segment_test.go index a0baad00d3..cb357bad07 100644 --- a/internal/reader/segment_test.go +++ b/internal/reader/segment_test.go @@ -1,147 +1,541 @@ package reader -//import ( -// "context" -// "encoding/binary" -// "fmt" -// "math" -// "testing" -// -// "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" -// -// "github.com/stretchr/testify/assert" -// msgPb "github.com/zilliztech/milvus-distributed/internal/proto/message" -//) -// -//func TestSegment_ConstructorAndDestructor(t *testing.T) { -// // 1. Construct node, collection, partition and segment -// ctx := context.Background() -// pulsarUrl := "pulsar://localhost:6650" -// node := NewQueryNode(ctx, 0, pulsarUrl) -// var collection = node.newCollection(0, "collection0", "") -// var partition = collection.newPartition("partition0") -// var segment = partition.newSegment(0) -// -// node.SegmentsMap[int64(0)] = segment -// -// assert.Equal(t, collection.CollectionName, "collection0") -// assert.Equal(t, partition.partitionTag, "partition0") -// assert.Equal(t, segment.SegmentID, int64(0)) -// assert.Equal(t, len(node.SegmentsMap), 1) -// -// // 2. Destruct collection, partition and segment -// partition.deleteSegment(node, segment) -// collection.deletePartition(node, partition) -// node.deleteCollection(collection) -// -// assert.Equal(t, len(node.Collections), 0) -// assert.Equal(t, len(node.SegmentsMap), 0) -// -// node.Close() -//} -// -//func TestSegment_SegmentInsert(t *testing.T) { -// // 1. Construct node, collection, partition and segment -// ctx := context.Background() -// pulsarUrl := "pulsar://localhost:6650" -// node := NewQueryNode(ctx, 0, pulsarUrl) -// var collection = node.newCollection(0, "collection0", "") -// var partition = collection.newPartition("partition0") -// var segment = partition.newSegment(0) -// -// node.SegmentsMap[int64(0)] = segment -// -// assert.Equal(t, collection.CollectionName, "collection0") -// assert.Equal(t, partition.partitionTag, "partition0") -// assert.Equal(t, segment.SegmentID, int64(0)) -// assert.Equal(t, len(node.SegmentsMap), 1) -// -// // 2. Create ids and timestamps -// ids := []int64{1, 2, 3} -// timestamps := []uint64{0, 0, 0} -// -// // 3. Create records, use schema below: -// // schema_tmp->AddField("fakeVec", DataType::VECTOR_FLOAT, 16); -// // schema_tmp->AddField("age", DataType::INT32); -// const DIM = 16 -// const N = 3 -// var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} -// var rawData []byte -// for _, ele := range vec { -// buf := make([]byte, 4) -// binary.LittleEndian.PutUint32(buf, math.Float32bits(ele)) -// rawData = append(rawData, buf...) -// } -// bs := make([]byte, 4) -// binary.LittleEndian.PutUint32(bs, 1) -// rawData = append(rawData, bs...) -// var records []*commonpb.Blob -// for i := 0; i < N; i++ { -// blob := &commonpb.Blob{ -// Value: rawData, -// } -// records = append(records, blob) -// } -// -// // 4. Do PreInsert -// var offset = segment.segmentPreInsert(N) -// assert.GreaterOrEqual(t, offset, int64(0)) -// -// // 5. Do Insert -// var err = segment.segmentInsert(offset, &ids, ×tamps, &records) -// assert.NoError(t, err) -// -// // 6. Destruct collection, partition and segment -// partition.deleteSegment(node, segment) -// collection.deletePartition(node, partition) -// node.deleteCollection(collection) -// -// assert.Equal(t, len(node.Collections), 0) -// assert.Equal(t, len(node.SegmentsMap), 0) -// -// node.Close() -//} -// -//func TestSegment_SegmentDelete(t *testing.T) { -// ctx := context.Background() -// // 1. Construct node, collection, partition and segment -// pulsarUrl := "pulsar://localhost:6650" -// node := NewQueryNode(ctx, 0, pulsarUrl) -// var collection = node.newCollection(0, "collection0", "") -// var partition = collection.newPartition("partition0") -// var segment = partition.newSegment(0) -// -// node.SegmentsMap[int64(0)] = segment -// -// assert.Equal(t, collection.CollectionName, "collection0") -// assert.Equal(t, partition.partitionTag, "partition0") -// assert.Equal(t, segment.SegmentID, int64(0)) -// assert.Equal(t, len(node.SegmentsMap), 1) -// -// // 2. Create ids and timestamps -// ids := []int64{1, 2, 3} -// timestamps := []uint64{0, 0, 0} -// -// // 3. Do PreDelete -// var offset = segment.segmentPreDelete(10) -// assert.GreaterOrEqual(t, offset, int64(0)) -// -// // 4. Do Delete -// var err = segment.segmentDelete(offset, &ids, ×tamps) -// assert.NoError(t, err) -// -// // 5. Destruct collection, partition and segment -// partition.deleteSegment(node, segment) -// collection.deletePartition(node, partition) -// node.deleteCollection(collection) -// -// assert.Equal(t, len(node.Collections), 0) -// assert.Equal(t, len(node.SegmentsMap), 0) -// -// node.Close() -//} -// -//func TestSegment_SegmentSearch(t *testing.T) { +import ( + "encoding/binary" + "github.com/golang/protobuf/proto" + "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" + "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" + "math" + "testing" + + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + + "github.com/stretchr/testify/assert" +) + +//-------------------------------------------------------------------------------------- constructor and destructor +func TestSegment_newSegment(t *testing.T) { + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + Id: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIds: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + collection := newCollection(&collectionMeta, collectionMetaBlob) + assert.Equal(t, collection.meta.Schema.Name, "collection0") + assert.Equal(t, collection.meta.Id, UniqueID(0)) + + segmentID := UniqueID(0) + segment := newSegment(collection, segmentID) + assert.Equal(t, segmentID, segment.segmentID) +} + +func TestSegment_deleteSegment(t *testing.T) { + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + Id: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIds: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + collection := newCollection(&collectionMeta, collectionMetaBlob) + assert.Equal(t, collection.meta.Schema.Name, "collection0") + assert.Equal(t, collection.meta.Id, UniqueID(0)) + + segmentID := UniqueID(0) + segment := newSegment(collection, segmentID) + assert.Equal(t, segmentID, segment.segmentID) + + deleteSegment(segment) +} + +//-------------------------------------------------------------------------------------- stats functions +func TestSegment_getRowCount(t *testing.T) { + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + Id: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIds: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + collection := newCollection(&collectionMeta, collectionMetaBlob) + assert.Equal(t, collection.meta.Schema.Name, "collection0") + assert.Equal(t, collection.meta.Id, UniqueID(0)) + + segmentID := UniqueID(0) + segment := newSegment(collection, segmentID) + assert.Equal(t, segmentID, segment.segmentID) + + ids := []int64{1, 2, 3} + timestamps := []uint64{0, 0, 0} + + const DIM = 16 + const N = 3 + var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} + var rawData []byte + for _, ele := range vec { + buf := make([]byte, 4) + binary.LittleEndian.PutUint32(buf, math.Float32bits(ele)) + rawData = append(rawData, buf...) + } + bs := make([]byte, 4) + binary.LittleEndian.PutUint32(bs, 1) + rawData = append(rawData, bs...) + var records []*commonpb.Blob + for i := 0; i < N; i++ { + blob := &commonpb.Blob{ + Value: rawData, + } + records = append(records, blob) + } + + var offset = segment.segmentPreInsert(N) + assert.GreaterOrEqual(t, offset, int64(0)) + + err := segment.segmentInsert(offset, &ids, ×tamps, &records) + assert.NoError(t, err) + + rowCount := segment.getRowCount() + assert.Equal(t, int64(N), rowCount) +} + +func TestSegment_getDeletedCount(t *testing.T) { + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + Id: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIds: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + collection := newCollection(&collectionMeta, collectionMetaBlob) + assert.Equal(t, collection.meta.Schema.Name, "collection0") + assert.Equal(t, collection.meta.Id, UniqueID(0)) + + segmentID := UniqueID(0) + segment := newSegment(collection, segmentID) + assert.Equal(t, segmentID, segment.segmentID) + + ids := []int64{1, 2, 3} + timestamps := []uint64{0, 0, 0} + + const DIM = 16 + const N = 3 + var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} + var rawData []byte + for _, ele := range vec { + buf := make([]byte, 4) + binary.LittleEndian.PutUint32(buf, math.Float32bits(ele)) + rawData = append(rawData, buf...) + } + bs := make([]byte, 4) + binary.LittleEndian.PutUint32(bs, 1) + rawData = append(rawData, bs...) + var records []*commonpb.Blob + for i := 0; i < N; i++ { + blob := &commonpb.Blob{ + Value: rawData, + } + records = append(records, blob) + } + + var offsetInsert = segment.segmentPreInsert(N) + assert.GreaterOrEqual(t, offsetInsert, int64(0)) + + var err = segment.segmentInsert(offsetInsert, &ids, ×tamps, &records) + assert.NoError(t, err) + + var offsetDelete = segment.segmentPreDelete(10) + assert.GreaterOrEqual(t, offsetDelete, int64(0)) + + err = segment.segmentDelete(offsetDelete, &ids, ×tamps) + assert.NoError(t, err) + + var deletedCount = segment.getDeletedCount() + // TODO: assert.Equal(t, deletedCount, len(ids)) + assert.Equal(t, deletedCount, int64(0)) +} + +func TestSegment_getMemSize(t *testing.T) { + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + Id: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIds: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + collection := newCollection(&collectionMeta, collectionMetaBlob) + assert.Equal(t, collection.meta.Schema.Name, "collection0") + assert.Equal(t, collection.meta.Id, UniqueID(0)) + + segmentID := UniqueID(0) + segment := newSegment(collection, segmentID) + assert.Equal(t, segmentID, segment.segmentID) + + ids := []int64{1, 2, 3} + timestamps := []uint64{0, 0, 0} + + const DIM = 16 + const N = 3 + var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} + var rawData []byte + for _, ele := range vec { + buf := make([]byte, 4) + binary.LittleEndian.PutUint32(buf, math.Float32bits(ele)) + rawData = append(rawData, buf...) + } + bs := make([]byte, 4) + binary.LittleEndian.PutUint32(bs, 1) + rawData = append(rawData, bs...) + var records []*commonpb.Blob + for i := 0; i < N; i++ { + blob := &commonpb.Blob{ + Value: rawData, + } + records = append(records, blob) + } + + var offset = segment.segmentPreInsert(N) + assert.GreaterOrEqual(t, offset, int64(0)) + + err := segment.segmentInsert(offset, &ids, ×tamps, &records) + assert.NoError(t, err) + + var memSize = segment.getMemSize() + assert.Equal(t, memSize, int64(2785280)) +} + +//-------------------------------------------------------------------------------------- dm & search functions +func TestSegment_segmentInsert(t *testing.T) { + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + Id: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIds: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + collection := newCollection(&collectionMeta, collectionMetaBlob) + assert.Equal(t, collection.meta.Schema.Name, "collection0") + assert.Equal(t, collection.meta.Id, UniqueID(0)) + + segmentID := UniqueID(0) + segment := newSegment(collection, segmentID) + assert.Equal(t, segmentID, segment.segmentID) + + ids := []int64{1, 2, 3} + timestamps := []uint64{0, 0, 0} + + const DIM = 16 + const N = 3 + var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} + var rawData []byte + for _, ele := range vec { + buf := make([]byte, 4) + binary.LittleEndian.PutUint32(buf, math.Float32bits(ele)) + rawData = append(rawData, buf...) + } + bs := make([]byte, 4) + binary.LittleEndian.PutUint32(bs, 1) + rawData = append(rawData, bs...) + var records []*commonpb.Blob + for i := 0; i < N; i++ { + blob := &commonpb.Blob{ + Value: rawData, + } + records = append(records, blob) + } + + var offset = segment.segmentPreInsert(N) + assert.GreaterOrEqual(t, offset, int64(0)) + + err := segment.segmentInsert(offset, &ids, ×tamps, &records) + assert.NoError(t, err) +} + +func TestSegment_segmentDelete(t *testing.T) { + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + Id: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIds: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + collection := newCollection(&collectionMeta, collectionMetaBlob) + assert.Equal(t, collection.meta.Schema.Name, "collection0") + assert.Equal(t, collection.meta.Id, UniqueID(0)) + + segmentID := UniqueID(0) + segment := newSegment(collection, segmentID) + assert.Equal(t, segmentID, segment.segmentID) + + ids := []int64{1, 2, 3} + timestamps := []uint64{0, 0, 0} + + const DIM = 16 + const N = 3 + var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} + var rawData []byte + for _, ele := range vec { + buf := make([]byte, 4) + binary.LittleEndian.PutUint32(buf, math.Float32bits(ele)) + rawData = append(rawData, buf...) + } + bs := make([]byte, 4) + binary.LittleEndian.PutUint32(bs, 1) + rawData = append(rawData, bs...) + var records []*commonpb.Blob + for i := 0; i < N; i++ { + blob := &commonpb.Blob{ + Value: rawData, + } + records = append(records, blob) + } + + var offsetInsert = segment.segmentPreInsert(N) + assert.GreaterOrEqual(t, offsetInsert, int64(0)) + + var err = segment.segmentInsert(offsetInsert, &ids, ×tamps, &records) + assert.NoError(t, err) + + var offsetDelete = segment.segmentPreDelete(10) + assert.GreaterOrEqual(t, offsetDelete, int64(0)) + + err = segment.segmentDelete(offsetDelete, &ids, ×tamps) + assert.NoError(t, err) +} + +//func TestSegment_segmentSearch(t *testing.T) { // ctx := context.Background() // // 1. Construct node, collection, partition and segment // pulsarUrl := "pulsar://localhost:6650" @@ -220,307 +614,159 @@ package reader // // node.Close() //} -// -//func TestSegment_SegmentPreInsert(t *testing.T) { -// ctx := context.Background() -// // 1. Construct node, collection, partition and segment -// pulsarUrl := "pulsar://localhost:6650" -// node := NewQueryNode(ctx, 0, pulsarUrl) -// var collection = node.newCollection(0, "collection0", "") -// var partition = collection.newPartition("partition0") -// var segment = partition.newSegment(0) -// -// node.SegmentsMap[int64(0)] = segment -// -// assert.Equal(t, collection.CollectionName, "collection0") -// assert.Equal(t, partition.partitionTag, "partition0") -// assert.Equal(t, segment.SegmentID, int64(0)) -// assert.Equal(t, len(node.SegmentsMap), 1) -// -// // 2. Do PreInsert -// var offset = segment.segmentPreInsert(10) -// assert.GreaterOrEqual(t, offset, int64(0)) -// -// // 3. Destruct collection, partition and segment -// partition.deleteSegment(node, segment) -// collection.deletePartition(node, partition) -// node.deleteCollection(collection) -// -// assert.Equal(t, len(node.Collections), 0) -// assert.Equal(t, len(node.SegmentsMap), 0) -// -// node.Close() -//} -// -//func TestSegment_SegmentPreDelete(t *testing.T) { -// ctx := context.Background() -// // 1. Construct node, collection, partition and segment -// pulsarUrl := "pulsar://localhost:6650" -// node := NewQueryNode(ctx, 0, pulsarUrl) -// var collection = node.newCollection(0, "collection0", "") -// var partition = collection.newPartition("partition0") -// var segment = partition.newSegment(0) -// -// node.SegmentsMap[int64(0)] = segment -// -// assert.Equal(t, collection.CollectionName, "collection0") -// assert.Equal(t, partition.partitionTag, "partition0") -// assert.Equal(t, segment.SegmentID, int64(0)) -// assert.Equal(t, len(node.SegmentsMap), 1) -// -// // 2. Do PreDelete -// var offset = segment.segmentPreDelete(10) -// assert.GreaterOrEqual(t, offset, int64(0)) -// -// // 3. Destruct collection, partition and segment -// partition.deleteSegment(node, segment) -// collection.deletePartition(node, partition) -// node.deleteCollection(collection) -// -// assert.Equal(t, len(node.Collections), 0) -// assert.Equal(t, len(node.SegmentsMap), 0) -// -// node.Close() -//} -// -//func TestSegment_GetRowCount(t *testing.T) { -// ctx := context.Background() -// // 1. Construct node, collection, partition and segment -// pulsarUrl := "pulsar://localhost:6650" -// node := NewQueryNode(ctx, 0, pulsarUrl) -// var collection = node.newCollection(0, "collection0", "") -// var partition = collection.newPartition("partition0") -// var segment = partition.newSegment(0) -// -// node.SegmentsMap[int64(0)] = segment -// -// assert.Equal(t, collection.CollectionName, "collection0") -// assert.Equal(t, partition.partitionTag, "partition0") -// assert.Equal(t, segment.SegmentID, int64(0)) -// assert.Equal(t, len(node.SegmentsMap), 1) -// -// // 2. Create ids and timestamps -// ids := []int64{1, 2, 3} -// timestamps := []uint64{0, 0, 0} -// -// // 3. Create records, use schema below: -// // schema_tmp->AddField("fakeVec", DataType::VECTOR_FLOAT, 16); -// // schema_tmp->AddField("age", DataType::INT32); -// const DIM = 16 -// const N = 3 -// var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} -// var rawData []byte -// for _, ele := range vec { -// buf := make([]byte, 4) -// binary.LittleEndian.PutUint32(buf, math.Float32bits(ele)) -// rawData = append(rawData, buf...) -// } -// bs := make([]byte, 4) -// binary.LittleEndian.PutUint32(bs, 1) -// rawData = append(rawData, bs...) -// var records []*commonpb.Blob -// for i := 0; i < N; i++ { -// blob := &commonpb.Blob{ -// Value: rawData, -// } -// records = append(records, blob) -// } -// -// // 4. Do PreInsert -// var offset = segment.segmentPreInsert(N) -// assert.GreaterOrEqual(t, offset, int64(0)) -// -// // 5. Do Insert -// var err = segment.segmentInsert(offset, &ids, ×tamps, &records) -// assert.NoError(t, err) -// -// // 6. Get segment row count -// var rowCount = segment.getRowCount() -// assert.Equal(t, rowCount, int64(len(ids))) -// -// // 7. Destruct collection, partition and segment -// partition.deleteSegment(node, segment) -// collection.deletePartition(node, partition) -// node.deleteCollection(collection) -// -// assert.Equal(t, len(node.Collections), 0) -// assert.Equal(t, len(node.SegmentsMap), 0) -// -// node.Close() -//} -// -//func TestSegment_GetDeletedCount(t *testing.T) { -// ctx := context.Background() -// // 1. Construct node, collection, partition and segment -// pulsarUrl := "pulsar://localhost:6650" -// node := NewQueryNode(ctx, 0, pulsarUrl) -// var collection = node.newCollection(0, "collection0", "") -// var partition = collection.newPartition("partition0") -// var segment = partition.newSegment(0) -// -// node.SegmentsMap[int64(0)] = segment -// -// assert.Equal(t, collection.CollectionName, "collection0") -// assert.Equal(t, partition.partitionTag, "partition0") -// assert.Equal(t, segment.SegmentID, int64(0)) -// assert.Equal(t, len(node.SegmentsMap), 1) -// -// // 2. Create ids and timestamps -// ids := []int64{1, 2, 3} -// timestamps := []uint64{0, 0, 0} -// -// // 3. Do PreDelete -// var offset = segment.segmentPreDelete(10) -// assert.GreaterOrEqual(t, offset, int64(0)) -// -// // 4. Do Delete -// var err = segment.segmentDelete(offset, &ids, ×tamps) -// assert.NoError(t, err) -// -// // 5. Get segment deleted count -// var deletedCount = segment.getDeletedCount() -// // TODO: assert.Equal(t, deletedCount, len(ids)) -// assert.Equal(t, deletedCount, int64(0)) -// -// // 6. Destruct collection, partition and segment -// partition.deleteSegment(node, segment) -// collection.deletePartition(node, partition) -// node.deleteCollection(collection) -// -// assert.Equal(t, len(node.Collections), 0) -// assert.Equal(t, len(node.SegmentsMap), 0) -// -// node.Close() -//} -// -//func TestSegment_GetMemSize(t *testing.T) { -// ctx := context.Background() -// // 1. Construct node, collection, partition and segment -// pulsarUrl := "pulsar://localhost:6650" -// node := NewQueryNode(ctx, 0, pulsarUrl) -// var collection = node.newCollection(0, "collection0", "") -// var partition = collection.newPartition("partition0") -// var segment = partition.newSegment(0) -// -// node.SegmentsMap[int64(0)] = segment -// -// assert.Equal(t, collection.CollectionName, "collection0") -// assert.Equal(t, partition.partitionTag, "partition0") -// assert.Equal(t, segment.SegmentID, int64(0)) -// assert.Equal(t, len(node.SegmentsMap), 1) -// -// // 2. Create ids and timestamps -// ids := []int64{1, 2, 3} -// timestamps := []uint64{0, 0, 0} -// -// // 3. Create records, use schema below: -// // schema_tmp->AddField("fakeVec", DataType::VECTOR_FLOAT, 16); -// // schema_tmp->AddField("age", DataType::INT32); -// const DIM = 16 -// const N = 3 -// var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} -// var rawData []byte -// for _, ele := range vec { -// buf := make([]byte, 4) -// binary.LittleEndian.PutUint32(buf, math.Float32bits(ele)) -// rawData = append(rawData, buf...) -// } -// bs := make([]byte, 4) -// binary.LittleEndian.PutUint32(bs, 1) -// rawData = append(rawData, bs...) -// var records []*commonpb.Blob -// for i := 0; i < N; i++ { -// blob := &commonpb.Blob{ -// Value: rawData, -// } -// records = append(records, blob) -// } -// -// // 4. Do PreInsert -// var offset = segment.segmentPreInsert(N) -// assert.GreaterOrEqual(t, offset, int64(0)) -// -// // 5. Do Insert -// var err = segment.segmentInsert(offset, &ids, ×tamps, &records) -// assert.NoError(t, err) -// -// // 6. Get memory usage in bytes -// var memSize = segment.getMemSize() -// assert.Equal(t, memSize, int64(2785280)) -// -// // 7. Destruct collection, partition and segment -// partition.deleteSegment(node, segment) -// collection.deletePartition(node, partition) -// node.deleteCollection(collection) -// -// assert.Equal(t, len(node.Collections), 0) -// assert.Equal(t, len(node.SegmentsMap), 0) -// -// node.Close() -//} -//func TestSegment_RealSchemaTest(t *testing.T) { -// ctx := context.Background() -// // 1. Construct node, collection, partition and segment -// var schemaString = "id: 6875229265736357360\nname: \"collection0\"\nschema: \u003c\n " + -// "field_metas: \u003c\n field_name: \"field_3\"\n type: INT32\n dim: 1\n \u003e\n " + -// "field_metas: \u003c\n field_name: \"field_vec\"\n type: VECTOR_FLOAT\n dim: 16\n " + -// "\u003e\n\u003e\ncreate_time: 1600764055\nsegment_ids: 6875229265736357360\npartition_tags: \"default\"\n" -// pulsarUrl := "pulsar://localhost:6650" -// node := NewQueryNode(ctx, 0, pulsarUrl) -// var collection = node.newCollection(0, "collection0", schemaString) -// var partition = collection.newPartition("partition0") -// var segment = partition.newSegment(0) -// -// node.SegmentsMap[int64(0)] = segment -// -// assert.Equal(t, collection.CollectionName, "collection0") -// assert.Equal(t, partition.partitionTag, "partition0") -// assert.Equal(t, segment.SegmentID, int64(0)) -// assert.Equal(t, len(node.SegmentsMap), 1) -// -// // 2. Create ids and timestamps -// ids := []int64{1, 2, 3} -// timestamps := []uint64{0, 0, 0} -// -// // 3. Create records, use schema below: -// // schema_tmp->AddField("fakeVec", DataType::VECTOR_FLOAT, 16); -// // schema_tmp->AddField("age", DataType::INT32); -// const DIM = 16 -// const N = 3 -// var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} -// var rawData []byte -// for _, ele := range vec { -// buf := make([]byte, 4) -// binary.LittleEndian.PutUint32(buf, math.Float32bits(ele)) -// rawData = append(rawData, buf...) -// } -// bs := make([]byte, 4) -// binary.LittleEndian.PutUint32(bs, 1) -// rawData = append(rawData, bs...) -// var records []*commonpb.Blob -// for i := 0; i < N; i++ { -// blob := &commonpb.Blob { -// Value: rawData, -// } -// records = append(records, blob) -// } -// -// // 4. Do PreInsert -// var offset = segment.segmentPreInsert(N) -// assert.GreaterOrEqual(t, offset, int64(0)) -// -// // 5. Do Insert -// var err = segment.segmentInsert(offset, &ids, ×tamps, &records) -// assert.NoError(t, err) -// -// // 6. Destruct collection, partition and segment -// partition.deleteSegment(node, segment) -// collection.deletePartition(node, partition) -// node.deleteCollection(collection) -// -// assert.Equal(t, len(node.Collections), 0) -// assert.Equal(t, len(node.SegmentsMap), 0) -// -// node.Close() -//} +//-------------------------------------------------------------------------------------- preDm functions +func TestSegment_segmentPreInsert(t *testing.T) { + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + Id: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIds: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + collection := newCollection(&collectionMeta, collectionMetaBlob) + assert.Equal(t, collection.meta.Schema.Name, "collection0") + assert.Equal(t, collection.meta.Id, UniqueID(0)) + + segmentID := UniqueID(0) + segment := newSegment(collection, segmentID) + assert.Equal(t, segmentID, segment.segmentID) + + const DIM = 16 + const N = 3 + var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} + var rawData []byte + for _, ele := range vec { + buf := make([]byte, 4) + binary.LittleEndian.PutUint32(buf, math.Float32bits(ele)) + rawData = append(rawData, buf...) + } + bs := make([]byte, 4) + binary.LittleEndian.PutUint32(bs, 1) + rawData = append(rawData, bs...) + var records []*commonpb.Blob + for i := 0; i < N; i++ { + blob := &commonpb.Blob{ + Value: rawData, + } + records = append(records, blob) + } + + var offset = segment.segmentPreInsert(N) + assert.GreaterOrEqual(t, offset, int64(0)) +} + +func TestSegment_segmentPreDelete(t *testing.T) { + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + Id: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIds: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + collection := newCollection(&collectionMeta, collectionMetaBlob) + assert.Equal(t, collection.meta.Schema.Name, "collection0") + assert.Equal(t, collection.meta.Id, UniqueID(0)) + + segmentID := UniqueID(0) + segment := newSegment(collection, segmentID) + assert.Equal(t, segmentID, segment.segmentID) + + ids := []int64{1, 2, 3} + timestamps := []uint64{0, 0, 0} + + const DIM = 16 + const N = 3 + var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} + var rawData []byte + for _, ele := range vec { + buf := make([]byte, 4) + binary.LittleEndian.PutUint32(buf, math.Float32bits(ele)) + rawData = append(rawData, buf...) + } + bs := make([]byte, 4) + binary.LittleEndian.PutUint32(bs, 1) + rawData = append(rawData, bs...) + var records []*commonpb.Blob + for i := 0; i < N; i++ { + blob := &commonpb.Blob{ + Value: rawData, + } + records = append(records, blob) + } + + var offsetInsert = segment.segmentPreInsert(N) + assert.GreaterOrEqual(t, offsetInsert, int64(0)) + + var err = segment.segmentInsert(offsetInsert, &ids, ×tamps, &records) + assert.NoError(t, err) + + var offsetDelete = segment.segmentPreDelete(10) + assert.GreaterOrEqual(t, offsetDelete, int64(0)) +}