From f1b08a98c55fcdcad5d809fea89bc40953247185 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Wed, 9 Sep 2020 11:53:06 +0800 Subject: [PATCH] Update segment cgo api unittest Signed-off-by: bigsheeper --- core/src/dog_segment/segment_c.cpp | 26 ++--- core/src/dog_segment/segment_c.h | 26 ++--- reader/query_node.go | 12 ++- reader/segment.go | 84 +++++++++------- reader/segment_test.go | 155 +++++++++++++++++++++++++++-- 5 files changed, 225 insertions(+), 78 deletions(-) diff --git a/core/src/dog_segment/segment_c.cpp b/core/src/dog_segment/segment_c.cpp index 3109113407..1b43321d11 100644 --- a/core/src/dog_segment/segment_c.cpp +++ b/core/src/dog_segment/segment_c.cpp @@ -30,15 +30,13 @@ DeleteSegment(CSegmentBase segment) { int Insert(CSegmentBase c_segment, - long int reserved_offset, - signed long int size, - const long* primary_keys, - const unsigned long* timestamps, - void* raw_data, - int sizeof_per_row, - signed long int count, - unsigned long timestamp_min, - unsigned long timestamp_max) { + long int reserved_offset, + signed long int size, + const long* primary_keys, + const unsigned long* timestamps, + void* raw_data, + int sizeof_per_row, + signed long int count) { auto segment = (milvus::dog_segment::SegmentBase*)c_segment; milvus::dog_segment::DogDataChunk dataChunk{}; @@ -63,12 +61,10 @@ PreInsert(CSegmentBase c_segment, long int size) { int Delete(CSegmentBase c_segment, - long int reserved_offset, - long size, - const long* primary_keys, - const unsigned long* timestamps, - unsigned long timestamp_min, - unsigned long timestamp_max) { + long int reserved_offset, + long size, + const long* primary_keys, + const unsigned long* timestamps) { auto segment = (milvus::dog_segment::SegmentBase*)c_segment; auto res = segment->Delete(reserved_offset, size, primary_keys, timestamps); diff --git a/core/src/dog_segment/segment_c.h b/core/src/dog_segment/segment_c.h index 40645f0764..937ec69578 100644 --- a/core/src/dog_segment/segment_c.h +++ b/core/src/dog_segment/segment_c.h @@ -17,27 +17,23 @@ DeleteSegment(CSegmentBase segment); int Insert(CSegmentBase c_segment, - long int reserved_offset, - signed long int size, - const long* primary_keys, - const unsigned long* timestamps, - void* raw_data, - int sizeof_per_row, - signed long int count, - unsigned long timestamp_min, - unsigned long timestamp_max); + long int reserved_offset, + signed long int size, + const long* primary_keys, + const unsigned long* timestamps, + void* raw_data, + int sizeof_per_row, + signed long int count); long int PreInsert(CSegmentBase c_segment, long int size); int Delete(CSegmentBase c_segment, - long int reserved_offset, - long size, - const long* primary_keys, - const unsigned long* timestamps, - unsigned long timestamp_min, - unsigned long timestamp_max); + long int reserved_offset, + long size, + const long* primary_keys, + const unsigned long* timestamps); long int PreDelete(CSegmentBase c_segment, long int size); diff --git a/reader/query_node.go b/reader/query_node.go index 3722067d47..c969ff5f1b 100644 --- a/reader/query_node.go +++ b/reader/query_node.go @@ -236,6 +236,7 @@ func (node *QueryNode) PreInsertAndDelete() msgPb.Status { fmt.Println(err.Error()) return msgPb.Status{ErrorCode: 1} } + var numOfRecords = len(node.insertData.insertRecords[segmentID]) var offset = targetSegment.SegmentPreInsert(numOfRecords) node.insertData.insertOffset[segmentID] = offset @@ -254,6 +255,7 @@ func (node *QueryNode) PreInsertAndDelete() msgPb.Status { fmt.Println(err.Error()) return msgPb.Status{ErrorCode: 1} } + var numOfRecords = len(node.deleteData.deleteIDs[segmentID]) var offset = targetSegment.SegmentPreDelete(numOfRecords) node.deleteData.deleteOffset[segmentID] = offset @@ -287,9 +289,12 @@ func (node *QueryNode) DoInsert(segmentID int64, records *[][]byte, wg *sync.Wai fmt.Println(err.Error()) return msgPb.Status{ErrorCode: 1} } + ids := node.insertData.insertIDs[segmentID] timestamps := node.insertData.insertTimestamps[segmentID] - err = targetSegment.SegmentInsert(&ids, ×tamps, records) + offsets := node.insertData.insertOffset[segmentID] + + err = targetSegment.SegmentInsert(offsets, &ids, ×tamps, records) if err != nil { fmt.Println(err.Error()) return msgPb.Status{ErrorCode: 1} @@ -305,7 +310,10 @@ func (node *QueryNode) DoDelete(segmentID int64, deleteIDs *[]int64, deleteTimes fmt.Println(err.Error()) return msgPb.Status{ErrorCode: 1} } - err = segment.SegmentDelete(deleteIDs, deleteTimestamps) + + offset := node.deleteData.deleteOffset[segmentID] + + err = segment.SegmentDelete(offset, deleteIDs, deleteTimestamps) if err != nil { fmt.Println(err.Error()) return msgPb.Status{ErrorCode: 1} diff --git a/reader/segment.go b/reader/segment.go index aa668bb7a7..683268b9a0 100644 --- a/reader/segment.go +++ b/reader/segment.go @@ -27,8 +27,8 @@ const ( ) type Segment struct { - SegmentPtr C.CSegmentBase - SegmentId int64 + SegmentPtr C.CSegmentBase + SegmentId int64 SegmentCloseTime uint64 } @@ -77,54 +77,58 @@ func (s *Segment) Close() error { //////////////////////////////////////////////////////////////////////////// func (s *Segment) SegmentPreInsert(numOfRecords int) int64 { - var offset = C.PreInsert(numOfRecords) + /*C.PreInsert + long int + PreInsert(CSegmentBase c_segment, long int size); + */ + var offset = C.PreInsert(C.long(int64(numOfRecords))) return offset } func (s *Segment) SegmentPreDelete(numOfRecords int) int64 { - var offset = C.PreDelete(numOfRecords) + /*C.PreDelete + long int + PreDelete(CSegmentBase c_segment, long int size); + */ + var offset = C.PreDelete(C.long(int64(numOfRecords))) return offset } -func (s *Segment) SegmentInsert(entityIds *[]int64, timestamps *[]uint64, records *[][]byte) error { +func (s *Segment) SegmentInsert(offset int64, entityIDs *[]int64, timestamps *[]uint64, records *[][]byte) error { /*C.Insert int Insert(CSegmentBase c_segment, + long int reserved_offset, signed long int size, - const unsigned long* primary_keys, + const long* primary_keys, const unsigned long* timestamps, void* raw_data, int sizeof_per_row, - signed long int count, - unsigned long timestamp_min, - unsigned long timestamp_max); + signed long int count); */ // Blobs to one big blob - var rowData []byte + var rawData []byte for i := 0; i < len(*records); i++ { - copy(rowData, (*records)[i]) + copy(rawData, (*records)[i]) } - // TODO: remove hard code schema - // auto schema_tmp = std::make_shared(); - // schema_tmp->AddField("fakeVec", DataType::VECTOR_FLOAT, 16); - // schema_tmp->AddField("age", DataType::INT32); - // TODO: remove hard code & fake dataChunk - const DIM = 4 - const N = 3 - var vec = [DIM]float32{1.1, 2.2, 3.3, 4.4} - var rawData []int8 - for i := 0; i <= N; i++ { - for _, ele := range vec { - rawData=append(rawData, int8(ele)) - } - rawData=append(rawData, int8(i)) - } - const sizeofPerRow = 4 + DIM * 4 + var cOffset = C.long(offset) + var cNumOfRows = C.long(len(*entityIDs)) + var cEntityIdsPtr = (*C.ulong)(&(*entityIDs)[0]) + var cTimestampsPtr = (*C.ulong)(&(*timestamps)[0]) + var cSizeofPerRow = C.int(len((*records)[0])) + var cRawDataVoidPtr = unsafe.Pointer(&rawData[0]) - var status = C.Insert(s.SegmentPtr, C.long(N), (*C.ulong)(&(*entityIds)[0]), (*C.ulong)(&(*timestamps)[0]), unsafe.Pointer(&rawData[0]), C.int(sizeofPerRow), C.long(N), C.ulong(timestampMin), C.ulong(timestampMax)) + var status = C.Insert(s.SegmentPtr, + cOffset, + cNumOfRows, + cEntityIdsPtr, + cTimestampsPtr, + cRawDataVoidPtr, + cSizeofPerRow, + cNumOfRows) if status != 0 { return errors.New("Insert failed, error code = " + strconv.Itoa(int(status))) @@ -133,19 +137,21 @@ func (s *Segment) SegmentInsert(entityIds *[]int64, timestamps *[]uint64, record return nil } -func (s *Segment) SegmentDelete(entityIds *[]int64, timestamps *[]uint64) error { +func (s *Segment) SegmentDelete(offset int64, entityIDs *[]int64, timestamps *[]uint64) error { /*C.Delete int Delete(CSegmentBase c_segment, + long int reserved_offset, long size, - const unsigned long* primary_keys, - const unsigned long* timestamps, - unsigned long timestamp_min, - unsigned long timestamp_max); + const long* primary_keys, + const unsigned long* timestamps); */ - size := len(*entityIds) + var cOffset = C.long(offset) + var cSize = C.long(len(*entityIDs)) + var cEntityIdsPtr = (*C.ulong)(&(*entityIDs)[0]) + var cTimestampsPtr = (*C.ulong)(&(*timestamps)[0]) - var status = C.Delete(s.SegmentPtr, C.long(size), (*C.ulong)(&(*entityIds)[0]), (*C.ulong)(&(*timestamps)[0]), C.ulong(timestampMin), C.ulong(timestampMax)) + var status = C.Delete(s.SegmentPtr, cOffset, cSize, cEntityIdsPtr, cTimestampsPtr) if status != 0 { return errors.New("Delete failed, error code = " + strconv.Itoa(int(status))) @@ -169,7 +175,13 @@ func (s *Segment) SegmentSearch(queryString string, timestamp uint64, vectorReco resultIds := make([]int64, TopK) resultDistances := make([]float32, TopK) - var status = C.Search(s.SegmentPtr, unsafe.Pointer(nil), C.ulong(timestamp), (*C.long)(&resultIds[0]), (*C.float)(&resultDistances[0])) + var cQueryPtr = unsafe.Pointer(nil) + var cTimestamp = C.ulong(timestamp) + var cResultIds = (*C.long)(&resultIds[0]) + var cResultDistances = (*C.float)(&resultDistances[0]) + + var status = C.Search(s.SegmentPtr, cQueryPtr, cTimestamp, cResultIds, cResultDistances) + if status != 0 { return nil, errors.New("Search failed, error code = " + strconv.Itoa(int(status))) } diff --git a/reader/segment_test.go b/reader/segment_test.go index 39303bca6d..79a488f049 100644 --- a/reader/segment_test.go +++ b/reader/segment_test.go @@ -7,135 +7,270 @@ import ( ) func TestConstructorAndDestructor(t *testing.T) { + // 1. Construct node, collection, and segment node := NewQueryNode(0, 0) var collection = node.NewCollection("collection0", "fake schema") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) + // 2. Destruct node, collection, and segment partition.DeleteSegment(segment) collection.DeletePartition(partition) node.DeleteCollection(collection) } -func TestSegmentInsert(t *testing.T) { +func TestSegment_SegmentInsert(t *testing.T) { + // 1. Construct node, collection, and segment node := NewQueryNode(0, 0) var collection = node.NewCollection("collection0", "fake schema") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) + // 2. Create ids and timestamps ids :=[] int64{1, 2, 3} timestamps :=[] uint64 {0, 0, 0} - var err = segment.SegmentInsert(&ids, ×tamps, nil) + // 3. Create records, use schema below: + // schema_tmp->AddField("fakeVec", DataType::VECTOR_FLOAT, 16); + // schema_tmp->AddField("age", DataType::INT32); + const DIM = 4 + const N = 3 + var vec = [DIM]float32{1.1, 2.2, 3.3, 4.4} + var rawData []byte + for _, ele := range vec { + rawData=append(rawData, byte(ele)) + } + rawData=append(rawData, byte(1)) + var records [][]byte + for i:= 0; i < N; i++ { + records = append(records, rawData) + } + + // 4. Do PreInsert + var offset = segment.SegmentPreInsert(N) + assert.Greater(t, offset, 0) + + // 5. Do Insert + var err = segment.SegmentInsert(offset, &ids, ×tamps, &records) assert.NoError(t, err) + // 6. Destruct node, collection, and segment partition.DeleteSegment(segment) collection.DeletePartition(partition) node.DeleteCollection(collection) } -func TestSegmentDelete(t *testing.T) { +func TestSegment_SegmentDelete(t *testing.T) { + // 1. Construct node, collection, and segment node := NewQueryNode(0, 0) var collection = node.NewCollection("collection0", "fake schema") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) + // 2. Create ids and timestamps ids :=[] int64{1, 2, 3} timestamps :=[] uint64 {0, 0, 0} - var err = segment.SegmentDelete(&ids, ×tamps) + // 3. Do PreDelete + var offset = segment.SegmentPreDelete(10) + assert.Greater(t, offset, 0) + + // 4. Do Delete + var err = segment.SegmentDelete(offset, &ids, ×tamps) assert.NoError(t, err) + // 5. Destruct node, collection, and segment partition.DeleteSegment(segment) collection.DeletePartition(partition) node.DeleteCollection(collection) } -func TestSegmentSearch(t *testing.T) { +func TestSegment_SegmentSearch(t *testing.T) { + // 1. Construct node, collection, and segment node := NewQueryNode(0, 0) var collection = node.NewCollection("collection0", "fake schema") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) + // 2. Create ids and timestamps ids :=[] int64{1, 2, 3} timestamps :=[] uint64 {0, 0, 0} - var insertErr = segment.SegmentInsert(&ids, ×tamps, nil) - assert.NoError(t, insertErr) + // 3. Create records, use schema below: + // schema_tmp->AddField("fakeVec", DataType::VECTOR_FLOAT, 16); + // schema_tmp->AddField("age", DataType::INT32); + const DIM = 4 + const N = 3 + var vec = [DIM]float32{1.1, 2.2, 3.3, 4.4} + var rawData []byte + for _, ele := range vec { + rawData=append(rawData, byte(ele)) + } + rawData=append(rawData, byte(1)) + var records [][]byte + for i:= 0; i < N; i++ { + records = append(records, rawData) + } + // 4. Do PreInsert + var offset = segment.SegmentPreInsert(N) + assert.Greater(t, offset, 0) + + // 5. Do Insert + var err = segment.SegmentInsert(offset, &ids, ×tamps, &records) + assert.NoError(t, err) + + // 6. Do search var searchRes, searchErr = segment.SegmentSearch("fake query string", timestamps[0], nil) assert.NoError(t, searchErr) fmt.Println(searchRes) + // 7. Destruct node, collection, and segment partition.DeleteSegment(segment) collection.DeletePartition(partition) node.DeleteCollection(collection) } -func TestSegment_GetStatus(t *testing.T) { +func TestSegment_SegmentPreInsert(t *testing.T) { + // 1. Construct node, collection, and segment node := NewQueryNode(0, 0) var collection = node.NewCollection("collection0", "fake schema") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) + // 2. Do PreInsert + var offset = segment.SegmentPreInsert(10) + assert.Greater(t, offset, 0) + + // 3. Destruct node, collection, and segment + partition.DeleteSegment(segment) + collection.DeletePartition(partition) + node.DeleteCollection(collection) +} + +func TestSegment_SegmentPreDelete(t *testing.T) { + // 1. Construct node, collection, and segment + node := NewQueryNode(0, 0) + var collection = node.NewCollection("collection0", "fake schema") + var partition = collection.NewPartition("partition0") + var segment = partition.NewSegment(0) + + // 2. Do PreDelete + var offset = segment.SegmentPreDelete(10) + assert.Greater(t, offset, 0) + + // 3. Destruct node, collection, and segment + partition.DeleteSegment(segment) + collection.DeletePartition(partition) + node.DeleteCollection(collection) +} + +// Segment util functions test +//////////////////////////////////////////////////////////////////////////// +func TestSegment_GetStatus(t *testing.T) { + // 1. Construct node, collection, and segment + node := NewQueryNode(0, 0) + var collection = node.NewCollection("collection0", "fake schema") + var partition = collection.NewPartition("partition0") + var segment = partition.NewSegment(0) + + // 2. Get segment status var status = segment.GetStatus() assert.Equal(t, status, SegmentOpened) + // 3. Destruct node, collection, and segment partition.DeleteSegment(segment) collection.DeletePartition(partition) node.DeleteCollection(collection) } func TestSegment_Close(t *testing.T) { + // 1. Construct node, collection, and segment node := NewQueryNode(0, 0) var collection = node.NewCollection("collection0", "fake schema") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) + // 2. Close segment var err = segment.Close() assert.NoError(t, err) + // 3. Destruct node, collection, and segment partition.DeleteSegment(segment) collection.DeletePartition(partition) node.DeleteCollection(collection) } func TestSegment_GetRowCount(t *testing.T) { + // 1. Construct node, collection, and segment node := NewQueryNode(0, 0) var collection = node.NewCollection("collection0", "fake schema") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) + // 2. Create ids and timestamps ids :=[] int64{1, 2, 3} timestamps :=[] uint64 {0, 0, 0} - var err = segment.SegmentInsert(&ids, ×tamps, nil) + // 3. Create records, use schema below: + // schema_tmp->AddField("fakeVec", DataType::VECTOR_FLOAT, 16); + // schema_tmp->AddField("age", DataType::INT32); + const DIM = 4 + const N = 3 + var vec = [DIM]float32{1.1, 2.2, 3.3, 4.4} + var rawData []byte + for _, ele := range vec { + rawData=append(rawData, byte(ele)) + } + rawData=append(rawData, byte(1)) + var records [][]byte + for i:= 0; i < N; i++ { + records = append(records, rawData) + } + + // 4. Do PreInsert + var offset = segment.SegmentPreInsert(N) + assert.Greater(t, offset, 0) + + // 5. Do Insert + var err = segment.SegmentInsert(offset, &ids, ×tamps, &records) assert.NoError(t, err) + // 6. Get segment row count var rowCount = segment.GetRowCount() assert.Equal(t, rowCount, int64(len(ids))) + // 7. Destruct node, collection, and segment partition.DeleteSegment(segment) collection.DeletePartition(partition) node.DeleteCollection(collection) } func TestSegment_GetDeletedCount(t *testing.T) { + // 1. Construct node, collection, and segment node := NewQueryNode(0, 0) var collection = node.NewCollection("collection0", "fake schema") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) + // 2. Create ids and timestamps ids :=[] int64{1, 2, 3} timestamps :=[] uint64 {0, 0, 0} - var err = segment.SegmentDelete(&ids, ×tamps) + // 3. Do PreDelete + var offset = segment.SegmentPreDelete(10) + assert.Greater(t, offset, 0) + + // 4. Do Delete + var err = segment.SegmentDelete(offset, &ids, ×tamps) assert.NoError(t, err) + // 5. Get segment deleted count var deletedCount = segment.GetDeletedCount() // TODO: assert.Equal(t, deletedCount, len(ids)) assert.Equal(t, deletedCount, int64(0)) + // 6. Destruct node, collection, and segment partition.DeleteSegment(segment) collection.DeletePartition(partition) node.DeleteCollection(collection)