From a38f539b9b26bbe8f303fa5c5812b33b14218ded Mon Sep 17 00:00:00 2001 From: FluorineDog Date: Wed, 25 Nov 2020 10:31:51 +0800 Subject: [PATCH] Move c headers to segcore folder Signed-off-by: FluorineDog --- internal/core/CMakeLists.txt | 2 +- internal/core/src/segcore/plan_c.h | 2 +- internal/core/src/segcore/segment_c.h | 4 +- internal/reader/collection.go | 4 +- internal/reader/collection_replica.go | 114 ++++- internal/reader/collection_replica_test.go | 511 ++++++++++++++++++++- internal/reader/index.go | 4 +- internal/reader/meta_service.go | 12 + internal/reader/meta_service_test.go | 74 ++- internal/reader/partition.go | 4 +- internal/reader/plan.go | 6 +- internal/reader/query_node.go | 4 +- internal/reader/segment.go | 8 +- 13 files changed, 709 insertions(+), 40 deletions(-) diff --git a/internal/core/CMakeLists.txt b/internal/core/CMakeLists.txt index b9c0301713..b1c2a94e5e 100644 --- a/internal/core/CMakeLists.txt +++ b/internal/core/CMakeLists.txt @@ -203,7 +203,7 @@ set( GPU_ENABLE "false" ) install( DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/src/segcore/ - DESTINATION include + DESTINATION include/segcore/ FILES_MATCHING PATTERN "*_c.h" ) diff --git a/internal/core/src/segcore/plan_c.h b/internal/core/src/segcore/plan_c.h index 0a46922729..1b03470c87 100644 --- a/internal/core/src/segcore/plan_c.h +++ b/internal/core/src/segcore/plan_c.h @@ -13,7 +13,7 @@ extern "C" { #endif -#include "collection_c.h" +#include "segcore/collection_c.h" #include #include diff --git a/internal/core/src/segcore/segment_c.h b/internal/core/src/segcore/segment_c.h index 29c1d17bfa..5e681bc689 100644 --- a/internal/core/src/segcore/segment_c.h +++ b/internal/core/src/segcore/segment_c.h @@ -14,8 +14,8 @@ extern "C" { #endif #include -#include "collection_c.h" -#include "plan_c.h" +#include "segcore/collection_c.h" +#include "segcore/plan_c.h" #include typedef void* CSegmentBase; diff --git a/internal/reader/collection.go b/internal/reader/collection.go index 952429f70c..9d7e3fb511 100644 --- a/internal/reader/collection.go +++ b/internal/reader/collection.go @@ -6,8 +6,8 @@ package reader #cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib -#include "collection_c.h" -#include "segment_c.h" +#include "segcore/collection_c.h" +#include "segcore/segment_c.h" */ import "C" diff --git a/internal/reader/collection_replica.go b/internal/reader/collection_replica.go index b1d17c33a8..af8b4d3d27 100644 --- a/internal/reader/collection_replica.go +++ b/internal/reader/collection_replica.go @@ -6,13 +6,14 @@ package reader #cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib -#include "collection_c.h" -#include "segment_c.h" +#include "segcore/collection_c.h" +#include "segcore/segment_c.h" */ import "C" import ( "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" + "log" "strconv" "sync" @@ -37,13 +38,18 @@ type collectionReplica interface { removeCollection(collectionID UniqueID) error getCollectionByID(collectionID UniqueID) (*Collection, error) getCollectionByName(collectionName string) (*Collection, error) + hasCollection(collectionID UniqueID) bool // partition // Partition tags in different collections are not unique, // so partition api should specify the target collection. + getPartitionNum(collectionID UniqueID) (int, error) addPartition(collectionID UniqueID, partitionTag string) error removePartition(collectionID UniqueID, partitionTag string) error + addPartitionsByCollectionMeta(colMeta *etcdpb.CollectionMeta) error + removePartitionsByCollectionMeta(colMeta *etcdpb.CollectionMeta) error getPartitionByTag(collectionID UniqueID, partitionTag string) (*Partition, error) + hasPartition(collectionID UniqueID, partitionTag string) bool // segment getSegmentNum() int @@ -142,7 +148,31 @@ func (colReplica *collectionReplicaImpl) getCollectionByName(collectionName stri return nil, errors.New("Cannot found collection: " + collectionName) } +func (colReplica *collectionReplicaImpl) hasCollection(collectionID UniqueID) bool { + colReplica.mu.RLock() + defer colReplica.mu.RUnlock() + + for _, col := range colReplica.collections { + if col.ID() == collectionID { + return true + } + } + return false +} + //----------------------------------------------------------------------------------------------------- partition +func (colReplica *collectionReplicaImpl) getPartitionNum(collectionID UniqueID) (int, error) { + collection, err := colReplica.getCollectionByID(collectionID) + if err != nil { + return -1, err + } + + colReplica.mu.RLock() + defer colReplica.mu.RUnlock() + + return len(collection.partitions), nil +} + func (colReplica *collectionReplicaImpl) addPartition(collectionID UniqueID, partitionTag string) error { collection, err := colReplica.getCollectionByID(collectionID) if err != nil { @@ -182,6 +212,61 @@ func (colReplica *collectionReplicaImpl) removePartition(collectionID UniqueID, return nil } +func (colReplica *collectionReplicaImpl) addPartitionsByCollectionMeta(colMeta *etcdpb.CollectionMeta) error { + if !colReplica.hasCollection(colMeta.ID) { + err := errors.New("Cannot find collection, id = " + strconv.FormatInt(colMeta.ID, 10)) + return err + } + pToAdd := make([]string, 0) + for _, partitionTag := range colMeta.PartitionTags { + if !colReplica.hasPartition(colMeta.ID, partitionTag) { + pToAdd = append(pToAdd, partitionTag) + } + } + + for _, tag := range pToAdd { + err := colReplica.addPartition(colMeta.ID, tag) + if err != nil { + log.Println(err) + } + } + + return nil +} + +func (colReplica *collectionReplicaImpl) removePartitionsByCollectionMeta(colMeta *etcdpb.CollectionMeta) error { + col, err := colReplica.getCollectionByID(colMeta.ID) + if err != nil { + return err + } + + colReplica.mu.Lock() + + pToDel := make([]string, 0) + for _, partition := range col.partitions { + hasPartition := false + for _, tag := range colMeta.PartitionTags { + if partition.partitionTag == tag { + hasPartition = true + } + } + if !hasPartition { + pToDel = append(pToDel, partition.partitionTag) + } + } + + colReplica.mu.Unlock() + + for _, tag := range pToDel { + err := colReplica.removePartition(col.ID(), tag) + if err != nil { + log.Println(err) + } + } + + return nil +} + func (colReplica *collectionReplicaImpl) getPartitionByTag(collectionID UniqueID, partitionTag string) (*Partition, error) { collection, err := colReplica.getCollectionByID(collectionID) if err != nil { @@ -200,6 +285,25 @@ func (colReplica *collectionReplicaImpl) getPartitionByTag(collectionID UniqueID return nil, errors.New("cannot find partition, tag = " + partitionTag) } +func (colReplica *collectionReplicaImpl) hasPartition(collectionID UniqueID, partitionTag string) bool { + collection, err := colReplica.getCollectionByID(collectionID) + if err != nil { + log.Println(err) + return false + } + + colReplica.mu.RLock() + defer colReplica.mu.RUnlock() + + for _, p := range *collection.Partitions() { + if p.Tag() == partitionTag { + return true + } + } + + return false +} + //----------------------------------------------------------------------------------------------------- segment func (colReplica *collectionReplicaImpl) getSegmentNum() int { colReplica.mu.RLock() @@ -209,6 +313,9 @@ func (colReplica *collectionReplicaImpl) getSegmentNum() int { } func (colReplica *collectionReplicaImpl) getSegmentStatistics() *internalpb.QueryNodeSegStats { + colReplica.mu.RLock() + defer colReplica.mu.RUnlock() + var statisticData = make([]*internalpb.SegmentStats, 0) for segmentID, segment := range colReplica.segments { @@ -306,6 +413,9 @@ func (colReplica *collectionReplicaImpl) hasSegment(segmentID UniqueID) bool { //----------------------------------------------------------------------------------------------------- func (colReplica *collectionReplicaImpl) freeAll() { + colReplica.mu.Lock() + defer colReplica.mu.Unlock() + for _, seg := range colReplica.segments { deleteSegment(seg) } diff --git a/internal/reader/collection_replica_test.go b/internal/reader/collection_replica_test.go index 864cd38ed6..1bbe00d424 100644 --- a/internal/reader/collection_replica_test.go +++ b/internal/reader/collection_replica_test.go @@ -13,7 +13,60 @@ import ( ) //----------------------------------------------------------------------------------------------------- collection -func TestColSegContainer_addCollection(t *testing.T) { +func TestCollectionReplica_getCollectionNum(t *testing.T) { + ctx := context.Background() + node := NewQueryNode(ctx, 0) + + collectionName := "collection0" + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: collectionName, + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + ID: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIDs: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) + assert.NoError(t, err) + + assert.Equal(t, (*node.replica).getCollectionNum(), 1) + + (*node.replica).freeAll() +} + +func TestCollectionReplica_addCollection(t *testing.T) { ctx := context.Background() node := NewQueryNode(ctx, 0) @@ -66,9 +119,11 @@ func TestColSegContainer_addCollection(t *testing.T) { assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.ID, UniqueID(0)) assert.Equal(t, (*node.replica).getCollectionNum(), 1) + + (*node.replica).freeAll() } -func TestColSegContainer_removeCollection(t *testing.T) { +func TestCollectionReplica_removeCollection(t *testing.T) { ctx := context.Background() node := NewQueryNode(ctx, 0) @@ -127,9 +182,11 @@ func TestColSegContainer_removeCollection(t *testing.T) { err = (*node.replica).removeCollection(collectionID) assert.NoError(t, err) assert.Equal(t, (*node.replica).getCollectionNum(), 0) + + (*node.replica).freeAll() } -func TestColSegContainer_getCollectionByID(t *testing.T) { +func TestCollectionReplica_getCollectionByID(t *testing.T) { ctx := context.Background() node := NewQueryNode(ctx, 0) @@ -189,9 +246,11 @@ func TestColSegContainer_getCollectionByID(t *testing.T) { assert.NotNil(t, targetCollection) assert.Equal(t, targetCollection.meta.Schema.Name, "collection0") assert.Equal(t, targetCollection.meta.ID, UniqueID(0)) + + (*node.replica).freeAll() } -func TestColSegContainer_getCollectionByName(t *testing.T) { +func TestCollectionReplica_getCollectionByName(t *testing.T) { ctx := context.Background() node := NewQueryNode(ctx, 0) @@ -251,10 +310,68 @@ func TestColSegContainer_getCollectionByName(t *testing.T) { assert.NotNil(t, targetCollection) assert.Equal(t, targetCollection.meta.Schema.Name, "collection0") assert.Equal(t, targetCollection.meta.ID, UniqueID(0)) + + (*node.replica).freeAll() +} + +func TestCollectionReplica_hasCollection(t *testing.T) { + ctx := context.Background() + node := NewQueryNode(ctx, 0) + + collectionName := "collection0" + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: collectionName, + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + ID: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIDs: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) + assert.NoError(t, err) + + hasCollection := (*node.replica).hasCollection(UniqueID(0)) + assert.Equal(t, hasCollection, true) + hasCollection = (*node.replica).hasCollection(UniqueID(1)) + assert.Equal(t, hasCollection, false) + + (*node.replica).freeAll() } //----------------------------------------------------------------------------------------------------- partition -func TestColSegContainer_addPartition(t *testing.T) { +func TestCollectionReplica_getPartitionNum(t *testing.T) { ctx := context.Background() node := NewQueryNode(ctx, 0) @@ -317,9 +434,82 @@ func TestColSegContainer_addPartition(t *testing.T) { assert.NoError(t, err) assert.Equal(t, partition.partitionTag, "default") } + + partitionNum, err := (*node.replica).getPartitionNum(UniqueID(0)) + assert.NoError(t, err) + assert.Equal(t, partitionNum, 1) + + (*node.replica).freeAll() } -func TestColSegContainer_removePartition(t *testing.T) { +func TestCollectionReplica_addPartition(t *testing.T) { + ctx := context.Background() + node := NewQueryNode(ctx, 0) + + collectionName := "collection0" + collectionID := UniqueID(0) + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + ID: collectionID, + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIDs: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) + assert.NoError(t, err) + + collection, err := (*node.replica).getCollectionByName(collectionName) + assert.NoError(t, err) + + assert.Equal(t, collection.meta.Schema.Name, collectionName) + assert.Equal(t, collection.meta.ID, collectionID) + assert.Equal(t, (*node.replica).getCollectionNum(), 1) + + for _, tag := range collectionMeta.PartitionTags { + err := (*node.replica).addPartition(collectionID, tag) + assert.NoError(t, err) + partition, err := (*node.replica).getPartitionByTag(collectionID, tag) + assert.NoError(t, err) + assert.Equal(t, partition.partitionTag, "default") + } + + (*node.replica).freeAll() +} + +func TestCollectionReplica_removePartition(t *testing.T) { ctx := context.Background() node := NewQueryNode(ctx, 0) @@ -385,9 +575,157 @@ func TestColSegContainer_removePartition(t *testing.T) { err = (*node.replica).removePartition(collectionID, partitionTag) assert.NoError(t, err) } + + (*node.replica).freeAll() } -func TestColSegContainer_getPartitionByTag(t *testing.T) { +func TestCollectionReplica_addPartitionsByCollectionMeta(t *testing.T) { + ctx := context.Background() + node := NewQueryNode(ctx, 0) + + collectionName := "collection0" + collectionID := UniqueID(0) + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + ID: collectionID, + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIDs: []UniqueID{0}, + PartitionTags: []string{"p0"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) + assert.NoError(t, err) + + collection, err := (*node.replica).getCollectionByName(collectionName) + assert.NoError(t, err) + + assert.Equal(t, collection.meta.Schema.Name, collectionName) + assert.Equal(t, collection.meta.ID, collectionID) + assert.Equal(t, (*node.replica).getCollectionNum(), 1) + + collectionMeta.PartitionTags = []string{"p0", "p1", "p2"} + + err = (*node.replica).addPartitionsByCollectionMeta(&collectionMeta) + assert.NoError(t, err) + partitionNum, err := (*node.replica).getPartitionNum(UniqueID(0)) + assert.NoError(t, err) + assert.Equal(t, partitionNum, 3) + hasPartition := (*node.replica).hasPartition(UniqueID(0), "p0") + assert.Equal(t, hasPartition, true) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p1") + assert.Equal(t, hasPartition, true) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p2") + assert.Equal(t, hasPartition, true) + + (*node.replica).freeAll() +} + +func TestCollectionReplica_removePartitionsByCollectionMeta(t *testing.T) { + ctx := context.Background() + node := NewQueryNode(ctx, 0) + + collectionName := "collection0" + collectionID := UniqueID(0) + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + ID: collectionID, + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIDs: []UniqueID{0}, + PartitionTags: []string{"p0", "p1", "p2"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) + assert.NoError(t, err) + + collection, err := (*node.replica).getCollectionByName(collectionName) + assert.NoError(t, err) + + assert.Equal(t, collection.meta.Schema.Name, collectionName) + assert.Equal(t, collection.meta.ID, collectionID) + assert.Equal(t, (*node.replica).getCollectionNum(), 1) + + collectionMeta.PartitionTags = []string{"p0"} + + err = (*node.replica).addPartitionsByCollectionMeta(&collectionMeta) + assert.NoError(t, err) + partitionNum, err := (*node.replica).getPartitionNum(UniqueID(0)) + assert.NoError(t, err) + assert.Equal(t, partitionNum, 1) + hasPartition := (*node.replica).hasPartition(UniqueID(0), "p0") + assert.Equal(t, hasPartition, true) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p1") + assert.Equal(t, hasPartition, false) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p2") + assert.Equal(t, hasPartition, false) + + (*node.replica).freeAll() +} + +func TestCollectionReplica_getPartitionByTag(t *testing.T) { ctx := context.Background() node := NewQueryNode(ctx, 0) @@ -451,10 +789,78 @@ func TestColSegContainer_getPartitionByTag(t *testing.T) { assert.Equal(t, partition.partitionTag, "default") assert.NotNil(t, partition) } + + (*node.replica).freeAll() +} + +func TestCollectionReplica_hasPartition(t *testing.T) { + ctx := context.Background() + node := NewQueryNode(ctx, 0) + + collectionName := "collection0" + collectionID := UniqueID(0) + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + ID: collectionID, + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIDs: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) + assert.NoError(t, err) + + collection, err := (*node.replica).getCollectionByName(collectionName) + assert.NoError(t, err) + + assert.Equal(t, collection.meta.Schema.Name, collectionName) + assert.Equal(t, collection.meta.ID, collectionID) + assert.Equal(t, (*node.replica).getCollectionNum(), 1) + + err = (*node.replica).addPartition(collectionID, collectionMeta.PartitionTags[0]) + assert.NoError(t, err) + hasPartition := (*node.replica).hasPartition(UniqueID(0), "default") + assert.Equal(t, hasPartition, true) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "default1") + assert.Equal(t, hasPartition, false) + + (*node.replica).freeAll() } //----------------------------------------------------------------------------------------------------- segment -func TestColSegContainer_addSegment(t *testing.T) { +func TestCollectionReplica_addSegment(t *testing.T) { ctx := context.Background() node := NewQueryNode(ctx, 0) @@ -521,9 +927,11 @@ func TestColSegContainer_addSegment(t *testing.T) { assert.NoError(t, err) assert.Equal(t, targetSeg.segmentID, UniqueID(i)) } + + (*node.replica).freeAll() } -func TestColSegContainer_removeSegment(t *testing.T) { +func TestCollectionReplica_removeSegment(t *testing.T) { ctx := context.Background() node := NewQueryNode(ctx, 0) @@ -592,9 +1000,11 @@ func TestColSegContainer_removeSegment(t *testing.T) { err = (*node.replica).removeSegment(UniqueID(i)) assert.NoError(t, err) } + + (*node.replica).freeAll() } -func TestColSegContainer_getSegmentByID(t *testing.T) { +func TestCollectionReplica_getSegmentByID(t *testing.T) { ctx := context.Background() node := NewQueryNode(ctx, 0) @@ -661,9 +1071,11 @@ func TestColSegContainer_getSegmentByID(t *testing.T) { assert.NoError(t, err) assert.Equal(t, targetSeg.segmentID, UniqueID(i)) } + + (*node.replica).freeAll() } -func TestColSegContainer_hasSegment(t *testing.T) { +func TestCollectionReplica_hasSegment(t *testing.T) { ctx := context.Background() node := NewQueryNode(ctx, 0) @@ -734,4 +1146,81 @@ func TestColSegContainer_hasSegment(t *testing.T) { hasSeg = (*node.replica).hasSegment(UniqueID(i + 100)) assert.Equal(t, hasSeg, false) } + + (*node.replica).freeAll() +} + +func TestCollectionReplica_freeAll(t *testing.T) { + ctx := context.Background() + node := NewQueryNode(ctx, 0) + + collectionName := "collection0" + collectionID := UniqueID(0) + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + ID: collectionID, + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIDs: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) + assert.NoError(t, err) + + collection, err := (*node.replica).getCollectionByName(collectionName) + assert.NoError(t, err) + + assert.Equal(t, collection.meta.Schema.Name, collectionName) + assert.Equal(t, collection.meta.ID, UniqueID(0)) + assert.Equal(t, (*node.replica).getCollectionNum(), 1) + + err = (*node.replica).addPartition(collectionID, collectionMeta.PartitionTags[0]) + assert.NoError(t, err) + + const segmentNum = 3 + for i := 0; i < segmentNum; i++ { + err := (*node.replica).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID) + assert.NoError(t, err) + targetSeg, err := (*node.replica).getSegmentByID(UniqueID(i)) + assert.NoError(t, err) + assert.Equal(t, targetSeg.segmentID, UniqueID(i)) + hasSeg := (*node.replica).hasSegment(UniqueID(i)) + assert.Equal(t, hasSeg, true) + hasSeg = (*node.replica).hasSegment(UniqueID(i + 100)) + assert.Equal(t, hasSeg, false) + } + + (*node.replica).freeAll() } diff --git a/internal/reader/index.go b/internal/reader/index.go index f30979d876..9233892efb 100644 --- a/internal/reader/index.go +++ b/internal/reader/index.go @@ -6,8 +6,8 @@ package reader #cgo LDFLAGS: -L../core/output/lib -lmilvus_segcore -Wl,-rpath=../core/output/lib -#include "collection_c.h" -#include "segment_c.h" +#include "segcore/collection_c.h" +#include "segcore/segment_c.h" */ import "C" diff --git a/internal/reader/meta_service.go b/internal/reader/meta_service.go index 535f45b88d..620cdd181f 100644 --- a/internal/reader/meta_service.go +++ b/internal/reader/meta_service.go @@ -214,6 +214,18 @@ func (mService *metaService) processSegmentModify(id string, value string) { func (mService *metaService) processCollectionModify(id string, value string) { println("Modify Collection: ", id) + + col := mService.collectionUnmarshal(value) + if col != nil { + err := (*mService.replica).addPartitionsByCollectionMeta(col) + if err != nil { + log.Println(err) + } + err = (*mService.replica).removePartitionsByCollectionMeta(col) + if err != nil { + log.Println(err) + } + } } func (mService *metaService) processModify(key string, msg string) { diff --git a/internal/reader/meta_service_test.go b/internal/reader/meta_service_test.go index 6c0470891a..df80c4a9d5 100644 --- a/internal/reader/meta_service_test.go +++ b/internal/reader/meta_service_test.go @@ -452,7 +452,9 @@ func TestMetaService_processCollectionModify(t *testing.T) { > > segmentIDs: 0 - partition_tags: "default" + partition_tags: "p0" + partition_tags: "p1" + partition_tags: "p2" ` (*node.metaService).processCollectionCreate(id, value) @@ -463,7 +465,19 @@ func TestMetaService_processCollectionModify(t *testing.T) { assert.NoError(t, err) assert.Equal(t, collection.ID(), UniqueID(0)) - // TODO: use different index for testing processCollectionModify + partitionNum, err := (*node.replica).getPartitionNum(UniqueID(0)) + assert.NoError(t, err) + assert.Equal(t, partitionNum, 3) + + hasPartition := (*node.replica).hasPartition(UniqueID(0), "p0") + assert.Equal(t, hasPartition, true) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p1") + assert.Equal(t, hasPartition, true) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p2") + assert.Equal(t, hasPartition, true) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p3") + assert.Equal(t, hasPartition, false) + newValue := `schema: < name: "test" fields: < @@ -484,13 +498,28 @@ func TestMetaService_processCollectionModify(t *testing.T) { > > segmentIDs: 0 - partition_tags: "default" + partition_tags: "p1" + partition_tags: "p2" + partition_tags: "p3" ` (*node.metaService).processCollectionModify(id, newValue) collection, err = (*node.replica).getCollectionByName("test") assert.NoError(t, err) assert.Equal(t, collection.ID(), UniqueID(0)) + + partitionNum, err = (*node.replica).getPartitionNum(UniqueID(0)) + assert.NoError(t, err) + assert.Equal(t, partitionNum, 3) + + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p0") + assert.Equal(t, hasPartition, false) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p1") + assert.Equal(t, hasPartition, true) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p2") + assert.Equal(t, hasPartition, true) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p3") + assert.Equal(t, hasPartition, true) } func TestMetaService_processModify(t *testing.T) { @@ -523,7 +552,9 @@ func TestMetaService_processModify(t *testing.T) { > > segmentIDs: 0 - partition_tags: "default" + partition_tags: "p0" + partition_tags: "p1" + partition_tags: "p2" ` (*node.metaService).processCreate(key1, msg1) @@ -534,8 +565,21 @@ func TestMetaService_processModify(t *testing.T) { assert.NoError(t, err) assert.Equal(t, collection.ID(), UniqueID(0)) + partitionNum, err := (*node.replica).getPartitionNum(UniqueID(0)) + assert.NoError(t, err) + assert.Equal(t, partitionNum, 3) + + hasPartition := (*node.replica).hasPartition(UniqueID(0), "p0") + assert.Equal(t, hasPartition, true) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p1") + assert.Equal(t, hasPartition, true) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p2") + assert.Equal(t, hasPartition, true) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p3") + assert.Equal(t, hasPartition, false) + key2 := "by-dev/segment/0" - msg2 := `partition_tag: "default" + msg2 := `partition_tag: "p1" channel_start: 0 channel_end: 128 close_time: 18446744073709551615 @@ -568,7 +612,9 @@ func TestMetaService_processModify(t *testing.T) { > > segmentIDs: 0 - partition_tags: "default" + partition_tags: "p1" + partition_tags: "p2" + partition_tags: "p3" ` (*node.metaService).processModify(key1, msg3) @@ -576,13 +622,25 @@ func TestMetaService_processModify(t *testing.T) { assert.NoError(t, err) assert.Equal(t, collection.ID(), UniqueID(0)) - msg4 := `partition_tag: "default" + partitionNum, err = (*node.replica).getPartitionNum(UniqueID(0)) + assert.NoError(t, err) + assert.Equal(t, partitionNum, 3) + + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p0") + assert.Equal(t, hasPartition, false) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p1") + assert.Equal(t, hasPartition, true) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p2") + assert.Equal(t, hasPartition, true) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p3") + assert.Equal(t, hasPartition, true) + + msg4 := `partition_tag: "p1" channel_start: 0 channel_end: 128 close_time: 18446744073709551615 ` - // TODO: modify segment for testing processCollectionModify (*node.metaService).processModify(key2, msg4) seg, err := (*node.replica).getSegmentByID(UniqueID(0)) assert.NoError(t, err) diff --git a/internal/reader/partition.go b/internal/reader/partition.go index ff4b6dcd01..210f1bf02f 100644 --- a/internal/reader/partition.go +++ b/internal/reader/partition.go @@ -6,8 +6,8 @@ package reader #cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib -#include "collection_c.h" -#include "segment_c.h" +#include "segcore/collection_c.h" +#include "segcore/segment_c.h" */ import "C" diff --git a/internal/reader/plan.go b/internal/reader/plan.go index f028975bc1..23f22bb303 100644 --- a/internal/reader/plan.go +++ b/internal/reader/plan.go @@ -3,9 +3,9 @@ package reader /* #cgo CFLAGS: -I${SRCDIR}/../core/output/include #cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib -#include "collection_c.h" -#include "segment_c.h" -#include "plan_c.h" +#include "segcore/collection_c.h" +#include "segcore/segment_c.h" +#include "segcore/plan_c.h" */ import "C" import ( diff --git a/internal/reader/query_node.go b/internal/reader/query_node.go index c1d18769cc..f4885fc07b 100644 --- a/internal/reader/query_node.go +++ b/internal/reader/query_node.go @@ -6,8 +6,8 @@ package reader #cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib -#include "collection_c.h" -#include "segment_c.h" +#include "segcore/collection_c.h" +#include "segcore/segment_c.h" */ import "C" diff --git a/internal/reader/segment.go b/internal/reader/segment.go index eabd69daf4..1c7ab40ee2 100644 --- a/internal/reader/segment.go +++ b/internal/reader/segment.go @@ -6,10 +6,10 @@ package reader #cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib -#include "collection_c.h" -#include "segment_c.h" -#include "plan_c.h" -#include "reduce_c.h" +#include "segcore/collection_c.h" +#include "segcore/segment_c.h" +#include "segcore/plan_c.h" +#include "segcore/reduce_c.h" */ import "C"