diff --git a/conf/config.yaml b/conf/config.yaml index f8bc5117a1..ad018ca847 100644 --- a/conf/config.yaml +++ b/conf/config.yaml @@ -52,7 +52,7 @@ reader: writer: clientid: 0 stopflag: -2 - readerqueuesize: 1024 + readerqueuesize: 10000 searchbyidchansize: 10000 topicstart: 0 topicend: 128 diff --git a/core/src/dog_segment/Collection.cpp b/core/src/dog_segment/Collection.cpp index 491a110db3..9dc4e157a0 100644 --- a/core/src/dog_segment/Collection.cpp +++ b/core/src/dog_segment/Collection.cpp @@ -20,8 +20,8 @@ Collection::AddIndex(const grpc::IndexParam& index_param) { auto& index_name = index_param.index_name(); auto& field_name = index_param.field_name(); - assert(!index_name.empty()); - assert(!field_name.empty()); + Assert(!index_name.empty()); + Assert(!field_name.empty()); auto index_type = knowhere::IndexEnum::INDEX_FAISS_IVFPQ; auto index_mode = knowhere::IndexMode::MODE_CPU; @@ -72,7 +72,7 @@ Collection::AddIndex(const grpc::IndexParam& index_param) { dim = field.get_dim(); } } - assert(dim != 0); + Assert(dim != 0); index_conf = milvus::knowhere::Config{ {knowhere::meta::DIM, dim}, diff --git a/core/src/dog_segment/ConcurrentVector.h b/core/src/dog_segment/ConcurrentVector.h index 5b71d33c6e..9d7b84e08a 100644 --- a/core/src/dog_segment/ConcurrentVector.h +++ b/core/src/dog_segment/ConcurrentVector.h @@ -7,6 +7,7 @@ #include #include #include +#include "EasyAssert.h" namespace milvus::dog_segment { // we don't use std::array because capacity of concurrent_vector wastes too much memory @@ -18,7 +19,7 @@ namespace milvus::dog_segment { // } // FixedVector(const FixedVector& placeholder_vec) // : std::vector(placeholder_vec.placeholder_size_), is_placeholder_(false) { -// // assert(placeholder_vec.is_placeholder_); +// // Assert(placeholder_vec.is_placeholder_); // } // FixedVector(FixedVector&&) = delete; // @@ -58,14 +59,14 @@ class ThreadSafeVector { } const Type& operator[](int64_t index) const { - assert(index < size_); + Assert(index < size_); std::shared_lock lck(mutex_); return vec_[index]; } Type& operator[](int64_t index) { - assert(index < size_); + Assert(index < size_); std::shared_lock lck(mutex_); return vec_[index]; } @@ -105,7 +106,7 @@ class ConcurrentVector : public VectorBase { public: explicit ConcurrentVector(ssize_t dim = 1) : Dim(is_scalar ? 1 : dim), SizePerChunk(Dim * ElementsPerChunk) { - assert(is_scalar ? dim == 1 : dim != 1); + Assert(is_scalar ? dim == 1 : dim != 1); } void @@ -171,7 +172,7 @@ class ConcurrentVector : public VectorBase { const Type& operator[](ssize_t element_index) const { - assert(Dim == 1); + Assert(Dim == 1); auto chunk_id = element_index / ElementsPerChunk; auto chunk_offset = element_index % ElementsPerChunk; return get_chunk(chunk_id)[chunk_offset]; @@ -190,7 +191,7 @@ class ConcurrentVector : public VectorBase { return; } auto chunk_max_size = chunks_.size(); - assert(chunk_id < chunk_max_size); + Assert(chunk_id < chunk_max_size); Chunk& chunk = chunks_[chunk_id]; auto ptr = chunk.data(); std::copy_n(source + source_offset * Dim, element_count * Dim, ptr + chunk_offset * Dim); diff --git a/core/src/dog_segment/EasyAssert.h b/core/src/dog_segment/EasyAssert.h new file mode 100644 index 0000000000..b60fd7e82c --- /dev/null +++ b/core/src/dog_segment/EasyAssert.h @@ -0,0 +1,19 @@ +#pragma once +#include + +namespace milvus::impl { +inline +void EasyAssertInfo(bool value, std::string_view expr_str, std::string_view filename, int lineno, + std::string_view extra_info) { + if (!value) { + std::string info; + info += "Assert \"" + std::string(expr_str) + "\""; + info += " at " + std::string(filename) + ":" + std::to_string(lineno); + info += " => " + std::string(extra_info); + throw std::runtime_error(info); + } +} +} + +#define AssertInfo(expr, info) impl::EasyAssertInfo(bool(expr), #expr, __FILE__, __LINE__, (info)) +#define Assert(expr) AssertInfo((expr), "") diff --git a/core/src/dog_segment/IndexMeta.cpp b/core/src/dog_segment/IndexMeta.cpp index 66faea04f5..06d7b428f4 100644 --- a/core/src/dog_segment/IndexMeta.cpp +++ b/core/src/dog_segment/IndexMeta.cpp @@ -19,7 +19,7 @@ IndexMeta::AddEntry(const std::string& index_name, const std::string& field_name throw std::invalid_argument("duplicate index_name"); } // TODO: support multiple indexes for single field - assert(!lookups_.count(field_name)); + Assert(!lookups_.count(field_name)); lookups_[field_name] = index_name; entries_[index_name] = std::move(entry); @@ -28,7 +28,7 @@ IndexMeta::AddEntry(const std::string& index_name, const std::string& field_name Status IndexMeta::DropEntry(const std::string& index_name) { - assert(entries_.count(index_name)); + Assert(entries_.count(index_name)); auto entry = std::move(entries_[index_name]); if(lookups_[entry.field_name] == index_name) { lookups_.erase(entry.field_name); @@ -46,9 +46,9 @@ void IndexMeta::VerifyEntry(const Entry &entry) { auto& field_meta = schema[entry.field_name]; // TODO checking if(field_meta.is_vector()) { - assert(entry.type == knowhere::IndexEnum::INDEX_FAISS_IVFPQ); + Assert(entry.type == knowhere::IndexEnum::INDEX_FAISS_IVFPQ); } else { - assert(false); + Assert(false); } } diff --git a/core/src/dog_segment/IndexMeta.h b/core/src/dog_segment/IndexMeta.h index 04856fc49d..18a85d75ba 100644 --- a/core/src/dog_segment/IndexMeta.h +++ b/core/src/dog_segment/IndexMeta.h @@ -41,7 +41,9 @@ class IndexMeta { } const Entry& lookup_by_field(const std::string& field_name) { + AssertInfo(lookups_.count(field_name), field_name); auto index_name = lookups_.at(field_name); + AssertInfo(entries_.count(index_name), index_name); return entries_.at(index_name); } private: diff --git a/core/src/dog_segment/SegmentDefs.h b/core/src/dog_segment/SegmentDefs.h index e986c66158..2156a56bb0 100644 --- a/core/src/dog_segment/SegmentDefs.h +++ b/core/src/dog_segment/SegmentDefs.h @@ -8,6 +8,7 @@ // #include "knowhere/index/Index.h" #include "utils/Status.h" #include "dog_segment/IndexMeta.h" +#include "EasyAssert.h" namespace milvus::dog_segment { using Timestamp = uint64_t; // TODO: use TiKV-like timestamp @@ -40,7 +41,7 @@ field_sizeof(DataType data_type, int dim = 1) { case DataType::VECTOR_FLOAT: return sizeof(float) * dim; case DataType::VECTOR_BINARY: { - assert(dim % 8 == 0); + Assert(dim % 8 == 0); return dim / 8; } default: { @@ -62,7 +63,7 @@ struct FieldMeta { bool is_vector() const { - assert(type_ != DataType::NONE); + Assert(type_ != DataType::NONE); return type_ == DataType::VECTOR_BINARY || type_ == DataType::VECTOR_FLOAT; } @@ -141,6 +142,8 @@ class Schema { const FieldMeta& operator[](int field_index) const { + Assert(field_index >= 0); + Assert(field_index < fields_.size()); return fields_[field_index]; } diff --git a/core/src/dog_segment/SegmentNaive.cpp b/core/src/dog_segment/SegmentNaive.cpp index 61d6ed5e02..10120caf31 100644 --- a/core/src/dog_segment/SegmentNaive.cpp +++ b/core/src/dog_segment/SegmentNaive.cpp @@ -9,7 +9,6 @@ #include #include - namespace milvus::dog_segment { int TestABI() { @@ -18,7 +17,6 @@ TestABI() { std::unique_ptr CreateSegment(SchemaPtr schema) { - auto segment = std::make_unique(schema); return segment; } @@ -26,10 +24,10 @@ CreateSegment(SchemaPtr schema) { SegmentNaive::Record::Record(const Schema &schema) : uids_(1), timestamps_(1) { for (auto &field : schema) { if (field.is_vector()) { - assert(field.get_data_type() == DataType::VECTOR_FLOAT); + Assert(field.get_data_type() == DataType::VECTOR_FLOAT); entity_vec_.emplace_back(std::make_shared>(field.get_dim())); } else { - assert(field.get_data_type() == DataType::INT32); + Assert(field.get_data_type() == DataType::INT32); entity_vec_.emplace_back(std::make_shared>()); } } @@ -73,7 +71,7 @@ auto SegmentNaive::get_deleted_bitmap(int64_t del_barrier, Timestamp query_times for (auto iter = iter_b; iter != iter_e; ++iter) { auto offset = iter->second; if (record_.timestamps_[offset] < query_timestamp) { - assert(offset < insert_barrier); + Assert(offset < insert_barrier); the_offset = std::max(the_offset, offset); } } @@ -102,7 +100,7 @@ auto SegmentNaive::get_deleted_bitmap(int64_t del_barrier, Timestamp query_times continue; } if (record_.timestamps_[offset] < query_timestamp) { - assert(offset < insert_barrier); + Assert(offset < insert_barrier); the_offset = std::max(the_offset, offset); } } @@ -123,12 +121,13 @@ auto SegmentNaive::get_deleted_bitmap(int64_t del_barrier, Timestamp query_times Status SegmentNaive::Insert(int64_t reserved_begin, int64_t size, const int64_t *uids_raw, const Timestamp *timestamps_raw, const DogDataChunk &entities_raw) { - assert(entities_raw.count == size); + Assert(entities_raw.count == size); if (entities_raw.sizeof_per_row != schema_->get_total_sizeof()) { std::string msg = "entity length = " + std::to_string(entities_raw.sizeof_per_row) + ", schema length = " + std::to_string(schema_->get_total_sizeof()); throw std::runtime_error(msg); } + auto raw_data = reinterpret_cast(entities_raw.raw_data); // std::vector entities(raw_data, raw_data + size * len_per_row); @@ -185,13 +184,13 @@ SegmentNaive::Insert(int64_t reserved_begin, int64_t size, const int64_t *uids_r // go.detach(); // const auto& schema = *schema_; // auto record_ptr = GetMutableRecord(); - // assert(record_ptr); + // Assert(record_ptr); // auto& record = *record_ptr; // auto data_chunk = ColumnBasedDataChunk::from(row_values, schema); // // // TODO: use shared_lock for better concurrency // std::lock_guard lck(mutex_); - // assert(state_ == SegmentState::Open); + // Assert(state_ == SegmentState::Open); // auto ack_id = ack_count_.load(); // record.uids_.grow_by(primary_keys, primary_keys + size); // for (int64_t i = 0; i < size; ++i) { @@ -263,19 +262,20 @@ SegmentNaive::QueryImpl(query::QueryPtr query_info, Timestamp timestamp, QueryRe auto ins_barrier = get_barrier(record_, timestamp); auto del_barrier = get_barrier(deleted_record_, timestamp); auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier, true); - assert(bitmap_holder); - assert(bitmap_holder->bitmap_ptr->capacity() == ins_barrier); + Assert(bitmap_holder); + Assert(bitmap_holder->bitmap_ptr->capacity() == ins_barrier); auto field_offset = schema_->get_offset(query_info->field_name); auto &field = schema_->operator[](query_info->field_name); - assert(field.get_data_type() == DataType::VECTOR_FLOAT); + Assert(field.get_data_type() == DataType::VECTOR_FLOAT); auto dim = field.get_dim(); auto bitmap = bitmap_holder->bitmap_ptr; auto topK = query_info->topK; auto num_queries = query_info->num_queries; auto the_offset_opt = schema_->get_offset(query_info->field_name); - assert(the_offset_opt.has_value()); + Assert(the_offset_opt.has_value()); + Assert(the_offset_opt.value() < record_.entity_vec_.size()); auto vec_ptr = std::static_pointer_cast>(record_.entity_vec_.at(the_offset_opt.value())); auto index_entry = index_meta_->lookup_by_field(query_info->field_name); auto conf = index_entry.config; @@ -354,10 +354,10 @@ SegmentNaive::QueryBruteForceImpl(query::QueryPtr query_info, Timestamp timestam auto ins_barrier = get_barrier(record_, timestamp); auto del_barrier = get_barrier(deleted_record_, timestamp); auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier); - assert(bitmap_holder); + Assert(bitmap_holder); auto &field = schema_->operator[](query_info->field_name); - assert(field.get_data_type() == DataType::VECTOR_FLOAT); + Assert(field.get_data_type() == DataType::VECTOR_FLOAT); auto dim = field.get_dim(); auto bitmap = bitmap_holder->bitmap_ptr; auto topK = query_info->topK; @@ -366,7 +366,8 @@ SegmentNaive::QueryBruteForceImpl(query::QueryPtr query_info, Timestamp timestam // TODO: optimize auto the_offset_opt = schema_->get_offset(query_info->field_name); - assert(the_offset_opt.has_value()); + Assert(the_offset_opt.has_value()); + Assert(the_offset_opt.value() < record_.entity_vec_.size()); auto vec_ptr = std::static_pointer_cast>(record_.entity_vec_.at(the_offset_opt.value())); std::vector final_uids(total_count); @@ -415,17 +416,18 @@ SegmentNaive::QuerySlowImpl(query::QueryPtr query_info, Timestamp timestamp, Que auto ins_barrier = get_barrier(record_, timestamp); auto del_barrier = get_barrier(deleted_record_, timestamp); auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier); - assert(bitmap_holder); + Assert(bitmap_holder); auto &field = schema_->operator[](query_info->field_name); - assert(field.get_data_type() == DataType::VECTOR_FLOAT); + Assert(field.get_data_type() == DataType::VECTOR_FLOAT); auto dim = field.get_dim(); auto bitmap = bitmap_holder->bitmap_ptr; auto topK = query_info->topK; auto num_queries = query_info->num_queries; // TODO: optimize auto the_offset_opt = schema_->get_offset(query_info->field_name); - assert(the_offset_opt.has_value()); + Assert(the_offset_opt.has_value()); + Assert(the_offset_opt.value() < record_.entity_vec_.size()); auto vec_ptr = std::static_pointer_cast>(record_.entity_vec_.at(the_offset_opt.value())); std::vector>> records(num_queries); @@ -521,7 +523,7 @@ SegmentNaive::Close() { template knowhere::IndexPtr SegmentNaive::BuildVecIndexImpl(const IndexMeta::Entry &entry) { auto offset_opt = schema_->get_offset(entry.field_name); - assert(offset_opt.has_value()); + Assert(offset_opt.has_value()); auto offset = offset_opt.value(); auto field = (*schema_)[offset]; auto dim = field.get_dim(); @@ -563,8 +565,8 @@ SegmentNaive::BuildIndex(IndexMetaPtr remote_index_meta) { } } - assert(dim != 0); - assert(!index_field_name.empty()); + Assert(dim != 0); + Assert(!index_field_name.empty()); auto index_meta = std::make_shared(schema_); // TODO: this is merge of query conf and insert conf @@ -587,19 +589,21 @@ SegmentNaive::BuildIndex(IndexMetaPtr remote_index_meta) { if(record_.ack_responder_.GetAck() < 1024 * 4) { return Status(SERVER_BUILD_INDEX_ERROR, "too few elements"); } + index_meta_ = remote_index_meta; for (auto&[index_name, entry]: index_meta_->get_entries()) { - assert(entry.index_name == index_name); + Assert(entry.index_name == index_name); const auto &field = (*schema_)[entry.field_name]; if (field.is_vector()) { - assert(field.get_data_type() == engine::DataType::VECTOR_FLOAT); + Assert(field.get_data_type() == engine::DataType::VECTOR_FLOAT); auto index_ptr = BuildVecIndexImpl(entry); indexings_[index_name] = index_ptr; } else { throw std::runtime_error("unimplemented"); } } + index_ready_ = true; return Status::OK(); } @@ -610,7 +614,7 @@ SegmentNaive::GetMemoryUsageInBytes() { if(index_ready_) { auto& index_entries = index_meta_->get_entries(); for(auto [index_name, entry]: index_entries) { - assert(schema_->operator[](entry.field_name).is_vector()); + Assert(schema_->operator[](entry.field_name).is_vector()); auto vec_ptr = std::static_pointer_cast(indexings_[index_name]); total_bytes += vec_ptr->IndexSize(); } diff --git a/core/src/dog_segment/SegmentNaive.h b/core/src/dog_segment/SegmentNaive.h index fd8283763c..74b9b889ab 100644 --- a/core/src/dog_segment/SegmentNaive.h +++ b/core/src/dog_segment/SegmentNaive.h @@ -14,6 +14,7 @@ #include "query/GeneralQuery.h" #include "utils/Status.h" #include "dog_segment/DeletedRecord.h" +#include "EasyAssert.h" namespace milvus::dog_segment { struct ColumnBasedDataChunk { @@ -27,7 +28,7 @@ struct ColumnBasedDataChunk { auto align = source.sizeof_per_row; for (auto &field : schema) { auto len = field.get_sizeof(); - assert(len % sizeof(float) == 0); + Assert(len % sizeof(float) == 0); std::vector new_col(len * count / sizeof(float)); for (int64_t i = 0; i < count; ++i) { memcpy(new_col.data() + i * len / sizeof(float), raw_data + i * align, len); diff --git a/pkg/master/informer/pulsar.go b/pkg/master/informer/pulsar.go index e22c48ee02..1ce93bcb4c 100644 --- a/pkg/master/informer/pulsar.go +++ b/pkg/master/informer/pulsar.go @@ -53,8 +53,9 @@ func (pc PulsarClient) Listener(ssChan chan mock.SegmentStats) error { if err != nil { log.Println("SegmentUnMarshal Failed") } - fmt.Printf("Received message msgId: %#v -- content: '%s'\n", - msg.ID(), m.SegementID) + //fmt.Printf("Received message msgId: %#v -- content: '%s'\n", + // msg.ID(), m.SegementID) + fmt.Println("Received SegmentStats -- segmentID:", m.SegementID, ",memSize:", m.MemorySize, ",memRate:", m.MemoryRate) ssChan <- m consumer.Ack(msg) } diff --git a/pkg/master/mock/segment.go b/pkg/master/mock/segment.go index b101f8a2cf..912311cc35 100644 --- a/pkg/master/mock/segment.go +++ b/pkg/master/mock/segment.go @@ -3,10 +3,8 @@ package mock import ( "bytes" "encoding/gob" - "github.com/golang/protobuf/proto" - "time" - masterpb "github.com/czs007/suvlim/pkg/master/grpc/master" + "github.com/golang/protobuf/proto" ) type SegmentStats struct { @@ -59,7 +57,7 @@ type Segment struct { Rows int64 `json:"rows"` } -func NewSegment(id uint64, collectioID uint64, cName string, ptag string, chStart int, chEnd int, openTime time.Time, closeTime time.Time) Segment { +func NewSegment(id uint64, collectioID uint64, cName string, ptag string, chStart int, chEnd int, openTime uint64, closeTime uint64) Segment { return Segment{ SegmentID: id, CollectionID: collectioID, @@ -67,8 +65,8 @@ func NewSegment(id uint64, collectioID uint64, cName string, ptag string, chStar PartitionTag: ptag, ChannelStart: chStart, ChannelEnd: chEnd, - OpenTimeStamp: uint64(openTime.Unix()), - CloseTimeStamp: uint64(closeTime.Unix()), + OpenTimeStamp: openTime, + CloseTimeStamp: closeTime, } } func Segment2JSON(s Segment) (string, error) { diff --git a/pkg/master/server.go b/pkg/master/server.go index 9b709b1ef4..aa06c0600f 100644 --- a/pkg/master/server.go +++ b/pkg/master/server.go @@ -61,16 +61,19 @@ func SegmentStatsController() { } } +func GetPhysicalTimeNow() uint64 { + return uint64(time.Now().UnixNano() / int64(time.Millisecond)) +} + func ComputeCloseTime(segmentCloseLog *map[uint64]uint64, ss mock.SegmentStats, kvbase kv.Base) error { segmentID := ss.SegementID if _, ok := (*segmentCloseLog)[segmentID]; ok { // This segment has been closed - log.Println("Segment", segmentID, "has been closed") return nil } if int(ss.MemorySize) > int(conf.Config.Master.SegmentThreshole*0.8) { - currentTime := time.Now() + currentTime := GetPhysicalTimeNow() memRate := int(ss.MemoryRate) if memRate == 0 { //memRate = 1 @@ -80,34 +83,54 @@ func ComputeCloseTime(segmentCloseLog *map[uint64]uint64, ss mock.SegmentStats, sec := float64(conf.Config.Master.SegmentThreshole*0.2) / float64(memRate) data, err := kvbase.Load("segment/" + strconv.Itoa(int(ss.SegementID))) if err != nil { + log.Println("Load segment failed") return err } seg, err := mock.JSON2Segment(data) if err != nil { + log.Println("JSON2Segment failed") return err } - segmentLogicTime := seg.CloseTimeStamp << 46 >> 46 - seg.CloseTimeStamp = uint64(currentTime.Add(time.Duration(sec) * time.Second).Unix()) << 18 + segmentLogicTime - fmt.Println("memRate = ", memRate, ",sec = ", sec ,",Close time = ", seg.CloseTimeStamp) + + seg.CloseTimeStamp = currentTime + uint64(sec * 1000) + // Reduce time gap between Proxy and Master + seg.CloseTimeStamp = seg.CloseTimeStamp + uint64(5 * 1000) + fmt.Println("Close segment = ", seg.SegmentID, ",Close time = ", seg.CloseTimeStamp) + updateData, err := mock.Segment2JSON(*seg) if err != nil { + log.Println("Update segment, Segment2JSON failed") return err } - kvbase.Save("segment/"+strconv.Itoa(int(ss.SegementID)), updateData) + err = kvbase.Save("segment/"+strconv.Itoa(int(ss.SegementID)), updateData) + if err != nil { + log.Println("Save segment failed") + return err + } + (*segmentCloseLog)[segmentID] = seg.CloseTimeStamp + //create new segment newSegID := id.New().Uint64() - newSeg := mock.NewSegment(newSegID, seg.CollectionID, seg.CollectionName, "default", seg.ChannelStart, seg.ChannelEnd, currentTime, time.Unix(1<<36-1, 0)) + newSeg := mock.NewSegment(newSegID, seg.CollectionID, seg.CollectionName, "default", seg.ChannelStart, seg.ChannelEnd, currentTime, 1 << 46 - 1) newSegData, err := mock.Segment2JSON(*&newSeg) if err != nil { + log.Println("Create new segment, Segment2JSON failed") return err } + //save to kv store - kvbase.Save("segment/"+strconv.Itoa(int(newSegID)), newSegData) + err = kvbase.Save("segment/"+strconv.Itoa(int(newSegID)), newSegData) + if err != nil { + log.Println("Save segment failed") + return err + } + // update collection data c, _ := kvbase.Load("collection/" + strconv.Itoa(int(seg.CollectionID))) collection, err := mock.JSON2Collection(c) if err != nil { + log.Println("JSON2Segment failed") return err } segIDs := collection.SegmentIDs @@ -115,9 +138,14 @@ func ComputeCloseTime(segmentCloseLog *map[uint64]uint64, ss mock.SegmentStats, collection.SegmentIDs = segIDs cData, err := mock.Collection2JSON(*collection) if err != nil { + log.Println("Collection2JSON failed") + return err + } + err = kvbase.Save("collection/"+strconv.Itoa(int(seg.CollectionID)), cData) + if err != nil { + log.Println("Save collection failed") return err } - kvbase.Save("segment/"+strconv.Itoa(int(seg.CollectionID)), cData) } return nil } @@ -234,8 +262,8 @@ func CollectionController(ch chan *messagepb.Mapping) { time.Now(), fieldMetas, []uint64{sID, s2ID}, []string{"default"}) cm := mock.GrpcMarshal(&c) - s := mock.NewSegment(sID, cID, collection.CollectionName, "default", 0, 511, time.Now(), time.Unix(1<<36-1, 0)) - s2 := mock.NewSegment(s2ID, cID, collection.CollectionName, "default", 512, 1023, time.Now(), time.Unix(1<<36-1, 0)) + s := mock.NewSegment(sID, cID, collection.CollectionName, "default", 0, 511, GetPhysicalTimeNow(), 1 << 46 - 1) + s2 := mock.NewSegment(s2ID, cID, collection.CollectionName, "default", 512, 1023, GetPhysicalTimeNow(), 1 << 46 - 1) collectionData, _ := mock.Collection2JSON(*cm) segmentData, err := mock.Segment2JSON(s) if err != nil { @@ -270,37 +298,75 @@ func WriteCollection2Datastore(collection *messagepb.Mapping) error { }) defer cli.Close() kvbase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath) - sID := id.New().Uint64() + cID := id.New().Uint64() + fieldMetas := []*messagepb.FieldMeta{} if collection.Schema != nil { fieldMetas = collection.Schema.FieldMetas } + + queryNodeNum := conf.Config.Master.QueryNodeNum + topicNum := conf.Config.Pulsar.TopicNum + var topicNumPerQueryNode int + + if topicNum % queryNodeNum != 0 { + topicNumPerQueryNode = topicNum / queryNodeNum + 1 + } else { + topicNumPerQueryNode = topicNum / queryNodeNum + } + + fmt.Println("QueryNodeNum = ", queryNodeNum) + fmt.Println("TopicNum = ", topicNum) + fmt.Println("TopicNumPerQueryNode = ", topicNumPerQueryNode) + + sIDs := make([]uint64, queryNodeNum) + + for i := 0; i < queryNodeNum; i++ { + // For generating different id + time.Sleep(1000 * time.Millisecond) + + sIDs[i] = id.New().Uint64() + } + c := mock.NewCollection(cID, collection.CollectionName, - time.Now(), fieldMetas, []uint64{sID}, + time.Now(), fieldMetas, sIDs, []string{"default"}) cm := mock.GrpcMarshal(&c) - s := mock.NewSegment(sID, cID, collection.CollectionName, "default", 0, conf.Config.Pulsar.TopicNum, time.Now(), time.Unix(1<<46-1, 0)) + collectionData, err := mock.Collection2JSON(*cm) if err != nil { log.Fatal(err) return err } - segmentData, err := mock.Segment2JSON(s) - if err != nil { - log.Fatal(err) - return err - } + err = kvbase.Save("collection/"+strconv.FormatUint(cID, 10), collectionData) if err != nil { log.Fatal(err) return err } - err = kvbase.Save("segment/"+strconv.FormatUint(sID, 10), segmentData) - if err != nil { - log.Fatal(err) - return err + + for i := 0; i < queryNodeNum; i++ { + chStart := i * topicNumPerQueryNode + chEnd := (i + 1) * topicNumPerQueryNode + if chEnd > topicNum { + chEnd = topicNum - 1 + } + s := mock.NewSegment(sIDs[i], cID, collection.CollectionName, "default", chStart, chEnd, GetPhysicalTimeNow(), 1 << 46 - 1) + + segmentData, err := mock.Segment2JSON(s) + if err != nil { + log.Fatal(err) + return err + } + + err = kvbase.Save("segment/"+strconv.FormatUint(sIDs[i], 10), segmentData) + if err != nil { + log.Fatal(err) + return err + } } + return nil } diff --git a/reader/read_node/meta.go b/reader/read_node/meta.go index bda1a9638d..f325a0f68e 100644 --- a/reader/read_node/meta.go +++ b/reader/read_node/meta.go @@ -36,9 +36,9 @@ func GetSegmentObjId(key string) string { func isCollectionObj(key string) bool { prefix := path.Join(conf.Config.Etcd.Rootpath, CollectonPrefix) + "/" prefix = strings.TrimSpace(prefix) - println("prefix is :$", prefix) + // println("prefix is :$", prefix) index := strings.Index(key, prefix) - println("index is :", index) + // println("index is :", index) return index == 0 } @@ -54,8 +54,15 @@ func isSegmentChannelRangeInQueryNodeChannelRange(segment *mock.Segment) bool { log.Printf("Illegal segment channel range") return false } - // TODO: add query node channel range check - return true + + var queryNodeChannelStart = conf.Config.Reader.TopicStart + var queryNodeChannelEnd = conf.Config.Reader.TopicEnd + + if segment.ChannelStart >= queryNodeChannelStart && segment.ChannelEnd <= queryNodeChannelEnd { + return true + } + + return false } func printCollectionStruct(obj *mock.Collection) { @@ -104,10 +111,9 @@ func (node *QueryNode) processSegmentCreate(id string, value string) { } printSegmentStruct(segment) - // TODO: fix this after channel range config finished - //if !isSegmentChannelRangeInQueryNodeChannelRange(segment) { - // return - //} + if !isSegmentChannelRangeInQueryNodeChannelRange(segment) { + return + } collection := node.GetCollectionByID(segment.CollectionID) if collection != nil { @@ -125,7 +131,7 @@ func (node *QueryNode) processSegmentCreate(id string, value string) { } func (node *QueryNode) processCreate(key string, msg string) { - println("process create", key, ":", msg) + println("process create", key) if isCollectionObj(key) { objID := GetCollectionObjId(key) node.processCollectionCreate(objID, msg) @@ -138,19 +144,18 @@ func (node *QueryNode) processCreate(key string, msg string) { } func (node *QueryNode) processSegmentModify(id string, value string) { - println("Modify Segment: ", id) + // println("Modify Segment: ", id) segment, err := mock.JSON2Segment(value) if err != nil { println("error of json 2 segment") println(err.Error()) } - printSegmentStruct(segment) + // printSegmentStruct(segment) - // TODO: fix this after channel range config finished - //if !isSegmentChannelRangeInQueryNodeChannelRange(segment) { - // return - //} + if !isSegmentChannelRangeInQueryNodeChannelRange(segment) { + return + } seg, err := node.GetSegmentBySegmentID(int64(segment.SegmentID)) // todo change to uint64 if seg != nil { @@ -159,13 +164,13 @@ func (node *QueryNode) processSegmentModify(id string, value string) { } func (node *QueryNode) processCollectionModify(id string, value string) { - println("Modify Collection: ", id) + // println("Modify Collection: ", id) collection, err := mock.JSON2Collection(value) if err != nil { println("error of json 2 collection") println(err.Error()) } - printCollectionStruct(collection) + // printCollectionStruct(collection) goCollection := node.GetCollectionByID(collection.ID) if goCollection != nil { @@ -175,7 +180,7 @@ func (node *QueryNode) processCollectionModify(id string, value string) { } func (node *QueryNode) processModify(key string, msg string) { - println("process modify") + // println("process modify") if isCollectionObj(key) { objID := GetCollectionObjId(key) node.processCollectionModify(objID, msg) @@ -214,7 +219,7 @@ func (node *QueryNode) processResp(resp clientv3.WatchResponse) error { if err != nil { return err } - println("processResp!!!!!\n") + // println("processResp!!!!!\n") for _, ev := range resp.Events { if ev.IsCreate() { diff --git a/reader/read_node/query_node.go b/reader/read_node/query_node.go index fd6f9a8ff6..a0e75158d5 100644 --- a/reader/read_node/query_node.go +++ b/reader/read_node/query_node.go @@ -16,7 +16,6 @@ import "C" import ( "encoding/json" "fmt" - "github.com/czs007/suvlim/conf" "github.com/stretchr/testify/assert" "log" "sort" @@ -333,13 +332,17 @@ func (node *QueryNode) RunSearch(wg *sync.WaitGroup) { node.messageClient.SearchMsg = node.messageClient.SearchMsg[:0] node.messageClient.SearchMsg = append(node.messageClient.SearchMsg, msg) fmt.Println("Do Search...") - var status = node.Search(node.messageClient.SearchMsg) - if status.ErrorCode != 0 { - fmt.Println("Search Failed") - node.PublishFailedSearchResult() + for { + if node.messageClient.SearchMsg[0].Timestamp < node.queryNodeTimeSync.ServiceTimeSync { + var status = node.Search(node.messageClient.SearchMsg) + if status.ErrorCode != 0 { + fmt.Println("Search Failed") + node.PublishFailedSearchResult() + } + break + } } default: - } } wg.Done() @@ -580,8 +583,8 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status { // So the ServiceTimeSync is always less than searchTimestamp. // Here, we manually make searchTimestamp's logic time minus `conf.Config.Timesync.Interval` milliseconds. // Which means `searchTimestamp.logicTime = searchTimestamp.logicTime - conf.Config.Timesync.Interval`. - var logicTimestamp = searchTimestamp << 46 >> 46 - searchTimestamp = (searchTimestamp >> 18 - uint64(conf.Config.Timesync.Interval)) << 18 + logicTimestamp + // var logicTimestamp = searchTimestamp << 46 >> 46 + // searchTimestamp = (searchTimestamp >> 18 - uint64(conf.Config.Timesync.Interval)) << 18 + logicTimestamp var vector = msg.Records // We now only the first Json is valid. @@ -590,7 +593,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status { // 1. Timestamp check // TODO: return or wait? Or adding graceful time if searchTimestamp > node.queryNodeTimeSync.ServiceTimeSync { - fmt.Println("Invalid query time, timestamp = ", searchTimestamp, ", SearchTimeSync = ", node.queryNodeTimeSync.ServiceTimeSync) + fmt.Println("Invalid query time, timestamp = ", searchTimestamp >> 18, ", SearchTimeSync = ", node.queryNodeTimeSync.ServiceTimeSync >> 18) return msgPb.Status{ErrorCode: 1} } @@ -599,6 +602,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status { // 3. Do search in all segments for _, segment := range node.SegmentsMap { + fmt.Println("Search in segment:", segment.SegmentId, ",segment rows:", segment.GetRowCount()) var res, err = segment.SegmentSearch(query, searchTimestamp, vector) if err != nil { fmt.Println(err.Error()) diff --git a/reader/read_node/segment.go b/reader/read_node/segment.go index 1dac55875c..f6f8044c3c 100644 --- a/reader/read_node/segment.go +++ b/reader/read_node/segment.go @@ -83,15 +83,16 @@ func (s *Segment) CloseSegment(collection* Collection) error { } // Build index after closing segment - s.SegmentStatus = SegmentIndexing - fmt.Println("Building index...") - s.buildIndex(collection) + //s.SegmentStatus = SegmentIndexing + //fmt.Println("Building index...") + //s.buildIndex(collection) // TODO: remove redundant segment indexed status // Change segment status to indexed - s.SegmentStatus = SegmentIndexed - fmt.Println("Segment closed and indexed") + //s.SegmentStatus = SegmentIndexed + //fmt.Println("Segment closed and indexed") + fmt.Println("Segment closed") return nil } diff --git a/reader/read_node/segment_service.go b/reader/read_node/segment_service.go index 75380be6ae..b7a2eed148 100644 --- a/reader/read_node/segment_service.go +++ b/reader/read_node/segment_service.go @@ -19,11 +19,10 @@ func (node *QueryNode) SegmentsManagement() { for _, partition := range collection.Partitions { for _, segment := range partition.Segments { if segment.SegmentStatus != SegmentOpened { - log.Println("Segment have been closed") continue } - fmt.Println("timeNow = ", timeNow, "SegmentCloseTime = ", segment.SegmentCloseTime) + // fmt.Println("timeNow = ", timeNow, "SegmentCloseTime = ", segment.SegmentCloseTime) if timeNow >= segment.SegmentCloseTime { go segment.CloseSegment(collection) }