diff --git a/core/src/dog_segment/Collection.cpp b/core/src/dog_segment/Collection.cpp index b45b82d60d..c7028988a8 100644 --- a/core/src/dog_segment/Collection.cpp +++ b/core/src/dog_segment/Collection.cpp @@ -11,7 +11,4 @@ Collection::set_index() {} void Collection::parse() {} -void -Collection::AddNewPartition() {} - } diff --git a/core/src/dog_segment/Collection.h b/core/src/dog_segment/Collection.h index 2ad0b47634..4c24257b3d 100644 --- a/core/src/dog_segment/Collection.h +++ b/core/src/dog_segment/Collection.h @@ -15,15 +15,19 @@ public: // TODO: config to schema void parse(); - void AddNewPartition(); +public: + SchemaPtr& get_schema() { + return schema_; + } private: // TODO: add Index ptr // IndexPtr index_ = nullptr; std::string collection_name_; std::string schema_json_; - milvus::dog_segment::SchemaPtr schema_; - std::vector partitions_; + SchemaPtr schema_; }; +using CollectionPtr = std::unique_ptr; + } diff --git a/core/src/dog_segment/Partition.cpp b/core/src/dog_segment/Partition.cpp index cd31a5ca50..0772e02f0a 100644 --- a/core/src/dog_segment/Partition.cpp +++ b/core/src/dog_segment/Partition.cpp @@ -2,19 +2,7 @@ namespace milvus::dog_segment { -Partition::Partition(std::string& partition_name): - partition_name_(partition_name) {} - -void -Partition::AddNewSegment(uint64_t segment_id) { - auto segment = CreateSegment(); - segment->set_segment_id(segment_id); - segments_.emplace_back(segment); -} - -Partition* -CreatePartition() { - -} +Partition::Partition(std::string& partition_name, SchemaPtr& schema): + partition_name_(partition_name), schema_(schema) {} } diff --git a/core/src/dog_segment/Partition.h b/core/src/dog_segment/Partition.h index bea4f26276..86ef0059de 100644 --- a/core/src/dog_segment/Partition.h +++ b/core/src/dog_segment/Partition.h @@ -6,21 +6,18 @@ namespace milvus::dog_segment { class Partition { public: - explicit Partition(std::string& partition_name); + explicit Partition(std::string& partition_name, SchemaPtr& schema); - const std::vector &segments() const { - return segments_; +public: + SchemaPtr& get_schema() { + return schema_; } - void AddNewSegment(uint64_t segment_id); - private: std::string partition_name_; - std::vector segments_; + SchemaPtr schema_; }; -using PartitionPtr = std::shared_ptr; - -Partition* CreatePartiton(); +using PartitionPtr = std::unique_ptr; } \ No newline at end of file diff --git a/core/src/dog_segment/SegmentBase.h b/core/src/dog_segment/SegmentBase.h index 03abaa29a0..84afcc8d93 100644 --- a/core/src/dog_segment/SegmentBase.h +++ b/core/src/dog_segment/SegmentBase.h @@ -102,12 +102,9 @@ class SegmentBase { uint64_t segment_id_; }; -using SegmentBasePtr = std::shared_ptr; +using SegmentBasePtr = std::unique_ptr; -std::unique_ptr CreateSegment(SchemaPtr ptr); - -// TODO: Delete this after schema parse function done -SegmentBase* CreateSegment(); +SegmentBasePtr CreateSegment(SchemaPtr& ptr); } // namespace engine } // namespace milvus diff --git a/core/src/dog_segment/SegmentNaive.cpp b/core/src/dog_segment/SegmentNaive.cpp index 08821d03b6..c245bbe67c 100644 --- a/core/src/dog_segment/SegmentNaive.cpp +++ b/core/src/dog_segment/SegmentNaive.cpp @@ -128,11 +128,8 @@ class SegmentNaive : public SegmentBase { } public: - friend std::unique_ptr - CreateSegment(SchemaPtr schema); - - friend SegmentBase* - CreateSegment(); + friend SegmentBasePtr + CreateSegment(SchemaPtr& schema); private: SchemaPtr schema_; @@ -147,22 +144,17 @@ class SegmentNaive : public SegmentBase { tbb::concurrent_unordered_multimap delete_logs_; }; -std::unique_ptr -CreateSegment(SchemaPtr schema) { - auto segment = std::make_unique(); - segment->schema_ = schema; - segment->entity_vecs_.resize(schema->size()); - return segment; -} +SegmentBasePtr +CreateSegment(SchemaPtr& schema) { + // TODO: remove hard code + auto schema_tmp = std::make_shared(); + schema_tmp->AddField("fakevec", DataType::VECTOR_FLOAT, 16); + schema_tmp->AddField("age", DataType::INT32); -SegmentBase* CreateSegment() { - auto segment = new SegmentNaive(); - auto schema = std::make_shared(); - schema->AddField("fakevec", DataType::VECTOR_FLOAT, 16); - schema->AddField("age", DataType::INT32); - segment->schema_ = schema; - segment->entity_vecs_.resize(schema->size()); - return segment; + auto segment = std::make_unique(); + segment->schema_ = schema_tmp; + segment->entity_vecs_.resize(schema_tmp->size()); + return segment; } Status diff --git a/core/src/dog_segment/collection_c.cpp b/core/src/dog_segment/collection_c.cpp index 4ba3fc5618..a0ff4ade0a 100644 --- a/core/src/dog_segment/collection_c.cpp +++ b/core/src/dog_segment/collection_c.cpp @@ -1,11 +1,19 @@ #include "collection_c.h" +#include "Collection.h" CCollection -NewCollection(const char* collection_name) { +NewCollection(const char* collection_name, const char* schema_conf) { + auto name = std::string(collection_name); + auto conf = std::string(schema_conf); + auto collection = std::make_unique(name, conf); + + return (void*)collection.release(); } void DeleteCollection(CCollection collection) { + auto col = (milvus::dog_segment::Collection*)collection; + delete col; } diff --git a/core/src/dog_segment/collection_c.h b/core/src/dog_segment/collection_c.h index fd4a96f118..02ae2c854e 100644 --- a/core/src/dog_segment/collection_c.h +++ b/core/src/dog_segment/collection_c.h @@ -4,7 +4,7 @@ extern "C" { typedef void* CCollection; -CCollection NewCollection(const char* collection_name); +CCollection NewCollection(const char* collection_name, const char* schema_conf); void DeleteCollection(CCollection collection); diff --git a/core/src/dog_segment/partition_c.cpp b/core/src/dog_segment/partition_c.cpp index a40b4e30eb..89336fcf70 100644 --- a/core/src/dog_segment/partition_c.cpp +++ b/core/src/dog_segment/partition_c.cpp @@ -4,9 +4,19 @@ CPartition NewPartition(CCollection collection, const char* partition_name) { - auto name = std::string(partition_name); - auto partition = new milvus::dog_segment::Partition(name); + auto c = (milvus::dog_segment::Collection*)collection; - auto co = (milvus::dog_segment::Collection*)collection; - co->AddNewPartition(); + auto name = std::string(partition_name); + + auto schema = c->get_schema(); + + auto partition = std::make_unique(name, schema); + + return (void*)partition.release(); +} + +void DeletePartition(CPartition partition) { + auto p = (milvus::dog_segment::Partition*)partition; + + delete p; } diff --git a/core/src/dog_segment/segment_c.cpp b/core/src/dog_segment/segment_c.cpp index f337c4babd..22a1e47b41 100644 --- a/core/src/dog_segment/segment_c.cpp +++ b/core/src/dog_segment/segment_c.cpp @@ -1,25 +1,23 @@ #include "SegmentBase.h" #include "segment_c.h" +#include "Partition.h" CSegmentBase -SegmentBaseInit(unsigned long segment_id) { - std::cout << "Hello milvus" << std::endl; - auto seg = milvus::dog_segment::CreateSegment(); - seg->set_segment_id(segment_id); - return (void*)seg; +NewSegment(CPartition partition, unsigned long segment_id) { + auto p = (milvus::dog_segment::Partition*)partition; + + auto segment = milvus::dog_segment::CreateSegment(p->get_schema()); + + segment->set_segment_id(segment_id); + + return (void*)segment.release(); } -//int32_t Insert(CSegmentBase c_segment, signed long int size, const unsigned long* primary_keys, const unsigned long int* timestamps, DogDataChunk values) { -// auto segment = (milvus::dog_segment::SegmentBase*)c_segment; -// milvus::dog_segment::DogDataChunk dataChunk{}; -// -// dataChunk.raw_data = values.raw_data; -// dataChunk.sizeof_per_row = values.sizeof_per_row; -// dataChunk.count = values.count; -// -// auto res = segment->Insert(size, primary_keys, timestamps, dataChunk); -// return res.code(); -//} +void DeleteSegment(CSegmentBase segment) { + auto s = (milvus::dog_segment::SegmentBase*)segment; + + delete s; +} int Insert(CSegmentBase c_segment, signed long int size, diff --git a/core/src/dog_segment/segment_c.h b/core/src/dog_segment/segment_c.h index 1be07d4ff6..3ebe9c1bda 100644 --- a/core/src/dog_segment/segment_c.h +++ b/core/src/dog_segment/segment_c.h @@ -2,17 +2,13 @@ extern "C" { #endif -//struct DogDataChunk { -// void* raw_data; // schema -// int sizeof_per_row; // alignment -// signed long int count; -//}; +#include "partition_c.h" typedef void* CSegmentBase; -CSegmentBase SegmentBaseInit(unsigned long segment_id); +CSegmentBase NewSegment(CPartition partition, unsigned long segment_id); -//int32_t Insert(CSegmentBase c_segment, signed long int size, const unsigned long* primary_keys, const unsigned long int* timestamps, DogDataChunk values); +void DeleteSegment(CSegmentBase segment); int Insert(CSegmentBase c_segment, signed long int size, diff --git a/core/unittest/test_c_api.cpp b/core/unittest/test_c_api.cpp index f2843182c5..b9993a280f 100644 --- a/core/unittest/test_c_api.cpp +++ b/core/unittest/test_c_api.cpp @@ -2,13 +2,14 @@ #include #include #include +#include #include "dog_segment/segment_c.h" #include "dog_segment/collection_c.h" TEST(SegmentTest, InsertTest) { - auto segment_id = 0; - auto s = SegmentBaseInit(segment_id); + auto fake_schema = std::make_shared(); + auto s = milvus::dog_segment::CreateSegment(fake_schema).release(); std::vector raw_data; std::vector timestamps; diff --git a/reader/collection.go b/reader/collection.go index dfe7945756..fb4d0ae72e 100644 --- a/reader/collection.go +++ b/reader/collection.go @@ -11,28 +11,28 @@ type Collection struct { Partitions []*Partition } -// TODO: Schema -type CollectionSchema string - -func NewCollection(collectionName string, schema CollectionSchema) (*Collection, error) { - cName := C.CString(collectionName) - cSchema := C.CString(schema) - collection, status := C.NewCollection(cName, cSchema) +func (c *Collection) NewPartition(partitionName string) (*Partition, error) { + cName := C.CString(partitionName) + partitionPtr, status := C.NewPartition(c.CollectionPtr, cName) if status != 0 { - return nil, errors.New("create collection failed") + return nil, errors.New("create partition failed") } - return &Collection{CollectionPtr: collection, CollectionName: collectionName}, nil + var newPartition = &Partition{PartitionPtr: partitionPtr, PartitionName: partitionName} + c.Partitions = append(c.Partitions, newPartition) + return newPartition, nil } -func DeleteCollection(collection *Collection) error { - status := C.DeleteCollection(collection.CollectionPtr) +func (c *Collection) DeletePartition(partitionName string) error { + cName := C.CString(partitionName) + status := C.DeletePartition(c.CollectionPtr, cName) if status != 0 { - return errors.New("delete collection failed") + return errors.New("create partition failed") } + // TODO: remove from c.Partitions return nil } diff --git a/reader/partition.go b/reader/partition.go index 97b698d8a9..f6c921606a 100644 --- a/reader/partition.go +++ b/reader/partition.go @@ -9,24 +9,25 @@ type Partition struct { Segments []*Segment } -func (c *Collection) NewPartition(partitionName string) (*Partition, error) { - cName := C.CString(partitionName) - partitionPtr, status := C.NewPartition(c.CollectionPtr, cName) +func (p *Partition) NewSegment(segmentId uint64) (*Segment, error) { + segmentPtr, status := C.NewSegment(p.PartitionPtr, segmentId) if status != 0 { - return nil, errors.New("create partition failed") + return nil, errors.New("create segment failed") } - return &Partition{PartitionPtr: partitionPtr, PartitionName: partitionName}, nil + var newSegment = &Segment{SegmentPtr: segmentPtr, SegmentId: segmentId} + p.Segments = append(p.Segments, newSegment) + return newSegment, nil } -func (c *Collection) DeletePartition(partitionName string) error { - cName := C.CString(partitionName) - status := C.DeletePartition(c.CollectionPtr, cName) +func (p *Partition) DeleteSegment() error { + status := C.DeleteSegment(p.PartitionPtr) if status != 0 { - return errors.New("create partition failed") + return errors.New("delete segment failed") } + // TODO: remove from p.Segments return nil } diff --git a/reader/query_node.go b/reader/query_node.go index dc6a8b2265..0dfc19e8d8 100644 --- a/reader/query_node.go +++ b/reader/query_node.go @@ -48,7 +48,38 @@ func NewQueryNode(timeSync uint64) *QueryNode { } } -func (node *QueryNode)doQueryNode(wg *sync.WaitGroup) { +// TODO: Schema +type CollectionSchema string + +func (node *QueryNode) NewCollection(collectionName string, schema CollectionSchema) (*Collection, error) { + cName := C.CString(collectionName) + cSchema := C.CString(schema) + collection, status := C.NewCollection(cName, cSchema) + + if status != 0 { + return nil, errors.New("create collection failed") + } + + var newCollection = &Collection{CollectionPtr: collection, CollectionName: collectionName} + node.Collections = append(node.Collections, newCollection) + + return newCollection, nil +} + +func (node *QueryNode) DeleteCollection(collection *Collection) error { + status := C.DeleteCollection(collection.CollectionPtr) + + if status != 0 { + return errors.New("delete collection failed") + } + + // TODO: remove from node.Collections + return nil +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// + +func (node *QueryNode) doQueryNode (wg *sync.WaitGroup) { wg.Add(3) go node.Insert(node.messageClient.InsertMsg, wg) go node.Delete(node.messageClient.DeleteMsg, wg) @@ -67,12 +98,6 @@ func (node *QueryNode) StartMessageClient() { go node.messageClient.ReceiveMessage() } -func (node *QueryNode) AddNewCollection(collectionName string, schema CollectionSchema) error { - var collection, err = NewCollection(collectionName, schema) - node.Collections = append(node.Collections, collection) - return err -} - func (node *QueryNode) GetSegmentByEntityId(entityId int64) *Segment { // TODO: get id2segment info from pulsar return nil @@ -115,13 +140,11 @@ func (node *QueryNode) GetTimeSync() uint64 { func (node *QueryNode) InitQueryNodeCollection() { // TODO: remove hard code, add collection creation request - var collection, _ = NewCollection("collection1", "fakeSchema") - node.Collections = append(node.Collections, collection) - var partition, _ = collection.NewPartition("partition1") - collection.Partitions = append(collection.Partitions, partition) + // TODO: error handle + var newCollection, _ = node.NewCollection("collection1", "fakeSchema") + var newPartition, _ = newCollection.NewPartition("partition1") // TODO: add segment id - var segment, _ = partition.NewSegment(0) - partition.Segments = append(partition.Segments, segment) + var _, _ = newPartition.NewSegment(0) } func (node *QueryNode) SegmentsManagement() { diff --git a/reader/segment.go b/reader/segment.go index 45dcd6cceb..4cafb2165e 100644 --- a/reader/segment.go +++ b/reader/segment.go @@ -11,7 +11,6 @@ package reader */ import "C" import ( - "errors" "suvlim/pulsar/schema" ) @@ -23,27 +22,6 @@ type Segment struct { SegmentCloseTime uint64 } -func (p *Partition) NewSegment(segmentId uint64) (*Segment, error) { - // TODO: add segment id - segmentPtr, status := C.SegmentBaseInit(p.PartitionPtr) - - if status != 0 { - return nil, errors.New("create segment failed") - } - - return &Segment{SegmentPtr: segmentPtr}, nil -} - -func (p *Partition) DeleteSegment() error { - status := C.DeleteSegment(p.PartitionPtr) - - if status != 0 { - return errors.New("delete segment failed") - } - - return nil -} - func (s *Segment) GetRowCount() int64 { // TODO: C type to go type return C.GetRowCount(s)