From 84f3d974faa96a7db819ae6fd4db07b9dbedc5bf Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Mon, 23 Nov 2020 18:04:09 +0800 Subject: [PATCH] Delete pulsar address test and refactor master param table Signed-off-by: bigsheeper --- cmd/master/main.go | 38 +- configs/advanced/master.yaml | 1 + configs/config.yaml | 1 + internal/core/src/query/CMakeLists.txt | 1 + internal/core/src/query/Search.cpp | 107 +++++ internal/core/src/query/Search.h | 19 + .../src/query/generated/ExecExprVisitor.h | 3 +- .../src/query/visitors/ExecExprVisitor.cpp | 5 +- .../query/visitors/ExecPlanNodeVisitor.cpp | 9 +- internal/core/src/segcore/AckResponder.h | 1 - internal/core/src/segcore/DeletedRecord.h | 1 + internal/core/src/segcore/IndexingEntry.h | 2 +- internal/core/src/segcore/InsertRecord.h | 1 + internal/core/src/segcore/Record.h | 21 + internal/core/src/segcore/SegmentBase.h | 5 +- internal/core/src/segcore/SegmentNaive.cpp | 17 - internal/core/src/segcore/SegmentNaive.h | 6 +- .../core/src/segcore/SegmentSmallIndex.cpp | 131 ------ internal/core/src/segcore/SegmentSmallIndex.h | 27 +- internal/master/collection_task_test.go | 55 +-- internal/master/global_allocator_test.go | 2 +- internal/master/grpc_service_test.go | 53 ++- internal/master/master.go | 74 ++-- internal/master/meta_table_test.go | 8 +- internal/master/param_table.go | 398 ++++++++++++++++++ internal/master/param_table_test.go | 143 +++++++ internal/master/paramtable.go | 216 ---------- internal/master/partition_task_test.go | 53 ++- internal/master/segment_manager.go | 66 +-- internal/master/segment_manager_test.go | 91 ++-- internal/master/time_snyc_producer_test.go | 2 +- internal/master/timesync_test.go | 2 +- internal/proxy/proxy_test.go | 35 +- 33 files changed, 942 insertions(+), 652 deletions(-) create mode 100644 internal/core/src/query/Search.cpp create mode 100644 internal/core/src/query/Search.h create mode 100644 internal/core/src/segcore/Record.h create mode 100644 internal/master/param_table.go create mode 100644 internal/master/param_table_test.go delete mode 100644 internal/master/paramtable.go diff --git a/cmd/master/main.go b/cmd/master/main.go index 64597f02bf..69ef9b51a1 100644 --- a/cmd/master/main.go +++ b/cmd/master/main.go @@ -17,41 +17,7 @@ func main() { // Creates server. ctx, cancel := context.WithCancel(context.Background()) - etcdAddress := master.Params.EtcdAddress() - etcdRootPath := master.Params.EtcdRootPath() - pulsarAddr := master.Params.PulsarAddress() - defaultRecordSize := master.Params.DefaultRecordSize() - minimumAssignSize := master.Params.MinimumAssignSize() - segmentThreshold := master.Params.SegmentThreshold() - segmentExpireDuration := master.Params.SegmentExpireDuration() - numOfChannel := master.Params.TopicNum() - nodeNum, _ := master.Params.QueryNodeNum() - statsChannel := master.Params.StatsChannels() - - opt := master.Option{ - KVRootPath: etcdRootPath, - MetaRootPath: etcdRootPath, - EtcdAddr: []string{etcdAddress}, - PulsarAddr: pulsarAddr, - ProxyIDs: master.Params.ProxyIDList(), - PulsarProxyChannels: master.Params.ProxyTimeSyncChannels(), - PulsarProxySubName: master.Params.ProxyTimeSyncSubName(), - SoftTTBInterval: master.Params.SoftTimeTickBarrierInterval(), - WriteIDs: master.Params.WriteIDList(), - PulsarWriteChannels: master.Params.WriteTimeSyncChannels(), - PulsarWriteSubName: master.Params.WriteTimeSyncSubName(), - PulsarDMChannels: master.Params.DMTimeSyncChannels(), - PulsarK2SChannels: master.Params.K2STimeSyncChannels(), - DefaultRecordSize: defaultRecordSize, - MinimumAssignSize: minimumAssignSize, - SegmentThreshold: segmentThreshold, - SegmentExpireDuration: segmentExpireDuration, - NumOfChannel: numOfChannel, - NumOfQueryNode: nodeNum, - StatsChannels: statsChannel, - } - - svr, err := master.CreateServer(ctx, &opt) + svr, err := master.CreateServer(ctx) if err != nil { log.Print("create server failed", zap.Error(err)) } @@ -69,7 +35,7 @@ func main() { cancel() }() - if err := svr.Run(int64(master.Params.Port())); err != nil { + if err := svr.Run(int64(master.Params.Port)); err != nil { log.Fatal("run server failed", zap.Error(err)) } diff --git a/configs/advanced/master.yaml b/configs/advanced/master.yaml index fda193fe72..e0837042d6 100644 --- a/configs/advanced/master.yaml +++ b/configs/advanced/master.yaml @@ -15,6 +15,7 @@ master: segment: # old name: segmentThreshold: 536870912 size: 512 # MB + sizeFactor: 0.75 defaultSizePerRecord: 1024 minIDAssignCnt: 1024 maxIDAssignCnt: 16384 diff --git a/configs/config.yaml b/configs/config.yaml index 87f4075fe0..7422ecefc8 100644 --- a/configs/config.yaml +++ b/configs/config.yaml @@ -31,6 +31,7 @@ master: minimumAssignSize: 1048576 segmentThreshold: 536870912 segmentExpireDuration: 2000 + segmentThresholdFactor: 0.75 querynodenum: 1 writenodenum: 1 statsChannels: "statistic" diff --git a/internal/core/src/query/CMakeLists.txt b/internal/core/src/query/CMakeLists.txt index 395bb345d4..9cd8c684af 100644 --- a/internal/core/src/query/CMakeLists.txt +++ b/internal/core/src/query/CMakeLists.txt @@ -8,6 +8,7 @@ set(MILVUS_QUERY_SRCS visitors/ShowExprVisitor.cpp visitors/ExecExprVisitor.cpp Plan.cpp + Search.cpp ) add_library(milvus_query ${MILVUS_QUERY_SRCS}) target_link_libraries(milvus_query milvus_proto) diff --git a/internal/core/src/query/Search.cpp b/internal/core/src/query/Search.cpp new file mode 100644 index 0000000000..1a38d1dea3 --- /dev/null +++ b/internal/core/src/query/Search.cpp @@ -0,0 +1,107 @@ +#include "Search.h" +#include +#include +#include "segcore/Reduce.h" + +#include +#include "utils/tools.h" + +namespace milvus::query { +static faiss::ConcurrentBitsetPtr +create_bitmap_view(std::optional bitmaps_opt, int64_t chunk_id) { + if (!bitmaps_opt.has_value()) { + return nullptr; + } + auto& bitmaps = *bitmaps_opt.value(); + auto& src_vec = bitmaps.at(chunk_id); + auto dst = std::make_shared(src_vec.size()); + boost::to_block_range(src_vec, dst->mutable_data()); + return dst; +} + +using namespace segcore; +Status +QueryBruteForceImpl(const SegmentSmallIndex& segment, + const query::QueryInfo& info, + const float* query_data, + int64_t num_queries, + Timestamp timestamp, + std::optional bitmaps_opt, + QueryResult& results) { + auto& record = segment.get_insert_record(); + auto& schema = segment.get_schema(); + auto& indexing_record = segment.get_indexing_record(); + // step 1: binary search to find the barrier of the snapshot + auto ins_barrier = get_barrier(record, timestamp); + auto max_chunk = upper_div(ins_barrier, DefaultElementPerChunk); + // auto del_barrier = get_barrier(deleted_record_, timestamp); + +#if 0 + auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier); + Assert(bitmap_holder); + auto bitmap = bitmap_holder->bitmap_ptr; +#endif + + // step 2.1: get meta + // step 2.2: get which vector field to search + auto vecfield_offset_opt = schema.get_offset(info.field_id_); + Assert(vecfield_offset_opt.has_value()); + auto vecfield_offset = vecfield_offset_opt.value(); + auto& field = schema[vecfield_offset]; + auto vec_ptr = record.get_vec_entity(vecfield_offset); + + Assert(field.get_data_type() == DataType::VECTOR_FLOAT); + auto dim = field.get_dim(); + auto topK = info.topK_; + auto total_count = topK * num_queries; + // TODO: optimize + + // step 3: small indexing search + std::vector final_uids(total_count, -1); + std::vector final_dis(total_count, std::numeric_limits::max()); + + auto max_indexed_id = indexing_record.get_finished_ack(); + const auto& indexing_entry = indexing_record.get_indexing(vecfield_offset); + auto search_conf = indexing_entry.get_search_conf(topK); + + for (int chunk_id = 0; chunk_id < max_indexed_id; ++chunk_id) { + auto indexing = indexing_entry.get_indexing(chunk_id); + auto src_data = vec_ptr->get_chunk(chunk_id).data(); + auto dataset = knowhere::GenDataset(num_queries, dim, src_data); + auto bitmap_view = create_bitmap_view(bitmaps_opt, chunk_id); + auto ans = indexing->Query(dataset, search_conf, bitmap_view); + auto dis = ans->Get(milvus::knowhere::meta::DISTANCE); + auto uids = ans->Get(milvus::knowhere::meta::IDS); + merge_into(num_queries, topK, final_dis.data(), final_uids.data(), dis, uids); + } + + // step 4: brute force search where small indexing is unavailable + for (int chunk_id = max_indexed_id; chunk_id < max_chunk; ++chunk_id) { + std::vector buf_uids(total_count, -1); + std::vector buf_dis(total_count, std::numeric_limits::max()); + + faiss::float_maxheap_array_t buf = {(size_t)num_queries, (size_t)topK, buf_uids.data(), buf_dis.data()}; + auto src_data = vec_ptr->get_chunk(chunk_id).data(); + auto nsize = + chunk_id != max_chunk - 1 ? DefaultElementPerChunk : ins_barrier - chunk_id * DefaultElementPerChunk; + auto bitmap_view = create_bitmap_view(bitmaps_opt, chunk_id); + faiss::knn_L2sqr(query_data, src_data, dim, num_queries, nsize, &buf, bitmap_view); + merge_into(num_queries, topK, final_dis.data(), final_uids.data(), buf_dis.data(), buf_uids.data()); + } + + // step 5: convert offset to uids + for (auto& id : final_uids) { + if (id == -1) { + continue; + } + id = record.uids_[id]; + } + + results.result_ids_ = std::move(final_uids); + results.result_distances_ = std::move(final_dis); + results.topK_ = topK; + results.num_queries_ = num_queries; + + return Status::OK(); +} +} // namespace milvus::query diff --git a/internal/core/src/query/Search.h b/internal/core/src/query/Search.h new file mode 100644 index 0000000000..0febf74a49 --- /dev/null +++ b/internal/core/src/query/Search.h @@ -0,0 +1,19 @@ +#pragma once +#include +#include "segcore/SegmentSmallIndex.h" +#include + +namespace milvus::query { +using BitmapChunk = boost::dynamic_bitset<>; +using BitmapSimple = std::deque; + +// note: c++17 don't support optional ref +Status +QueryBruteForceImpl(const segcore::SegmentSmallIndex& segment, + const QueryInfo& info, + const float* query_data, + int64_t num_queries, + Timestamp timestamp, + std::optional bitmap_opt, + segcore::QueryResult& results); +} // namespace milvus::query diff --git a/internal/core/src/query/generated/ExecExprVisitor.h b/internal/core/src/query/generated/ExecExprVisitor.h index 2e173289ab..e5327f7e71 100644 --- a/internal/core/src/query/generated/ExecExprVisitor.h +++ b/internal/core/src/query/generated/ExecExprVisitor.h @@ -4,6 +4,7 @@ #include "segcore/SegmentSmallIndex.h" #include #include "query/ExprImpl.h" +#include "boost/dynamic_bitset.hpp" #include "ExprVisitor.h" namespace milvus::query { @@ -22,7 +23,7 @@ class ExecExprVisitor : ExprVisitor { visit(RangeExpr& expr) override; public: - using RetType = std::vector>; + using RetType = std::deque>; explicit ExecExprVisitor(segcore::SegmentSmallIndex& segment) : segment_(segment) { } RetType diff --git a/internal/core/src/query/visitors/ExecExprVisitor.cpp b/internal/core/src/query/visitors/ExecExprVisitor.cpp index 43b6971200..889485393e 100644 --- a/internal/core/src/query/visitors/ExecExprVisitor.cpp +++ b/internal/core/src/query/visitors/ExecExprVisitor.cpp @@ -1,6 +1,7 @@ #include "segcore/SegmentSmallIndex.h" #include #include "query/ExprImpl.h" +#include "boost/dynamic_bitset.hpp" #include "query/generated/ExecExprVisitor.h" namespace milvus::query { @@ -10,7 +11,7 @@ namespace milvus::query { namespace impl { class ExecExprVisitor : ExprVisitor { public: - using RetType = std::vector>; + using RetType = std::deque>; explicit ExecExprVisitor(segcore::SegmentSmallIndex& segment) : segment_(segment) { } RetType @@ -66,7 +67,7 @@ ExecExprVisitor::ExecRangeVisitorImpl(RangeExprImpl& expr, Func func) -> RetT auto& field_meta = schema[field_offset]; auto vec_ptr = records.get_scalar_entity(field_offset); auto& vec = *vec_ptr; - std::vector> results(vec.chunk_size()); + RetType results(vec.chunk_size()); for (auto chunk_id = 0; chunk_id < vec.chunk_size(); ++chunk_id) { auto& result = results[chunk_id]; result.resize(segcore::DefaultElementPerChunk); diff --git a/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp b/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp index 14cc43c99b..f35f7e9367 100644 --- a/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp +++ b/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp @@ -3,6 +3,8 @@ #include "segcore/SegmentBase.h" #include "query/generated/ExecPlanNodeVisitor.h" #include "segcore/SegmentSmallIndex.h" +#include "query/generated/ExecExprVisitor.h" +#include "query/Search.h" namespace milvus::query { @@ -49,7 +51,12 @@ ExecPlanNodeVisitor::visit(FloatVectorANNS& node) { auto& ph = placeholder_group_.at(0); auto src_data = ph.get_blob(); auto num_queries = ph.num_of_queries_; - segment->QueryBruteForceImpl(node.query_info_, src_data, num_queries, timestamp_, ret); + if (node.predicate_.has_value()) { + auto bitmap = ExecExprVisitor(*segment).call_child(*node.predicate_.value()); + auto ptr = &bitmap; + QueryBruteForceImpl(*segment, node.query_info_, src_data, num_queries, timestamp_, ptr, ret); + } + QueryBruteForceImpl(*segment, node.query_info_, src_data, num_queries, timestamp_, std::nullopt, ret); ret_ = ret; } diff --git a/internal/core/src/segcore/AckResponder.h b/internal/core/src/segcore/AckResponder.h index 894a2dd237..bb12db60e4 100644 --- a/internal/core/src/segcore/AckResponder.h +++ b/internal/core/src/segcore/AckResponder.h @@ -37,6 +37,5 @@ class AckResponder { std::shared_mutex mutex_; std::set acks_ = {0}; std::atomic minimum_ = 0; - // std::atomic maximum_ = 0; }; } // namespace milvus::segcore diff --git a/internal/core/src/segcore/DeletedRecord.h b/internal/core/src/segcore/DeletedRecord.h index 54e35e9275..3edbcc9c40 100644 --- a/internal/core/src/segcore/DeletedRecord.h +++ b/internal/core/src/segcore/DeletedRecord.h @@ -4,6 +4,7 @@ #include "common/Schema.h" #include "knowhere/index/vector_index/IndexIVF.h" #include +#include "segcore/Record.h" namespace milvus::segcore { diff --git a/internal/core/src/segcore/IndexingEntry.h b/internal/core/src/segcore/IndexingEntry.h index 8823602b39..6cc61df36d 100644 --- a/internal/core/src/segcore/IndexingEntry.h +++ b/internal/core/src/segcore/IndexingEntry.h @@ -67,7 +67,7 @@ class IndexingRecord { // concurrent int64_t - get_finished_ack() { + get_finished_ack() const { return finished_ack_.GetAck(); } diff --git a/internal/core/src/segcore/InsertRecord.h b/internal/core/src/segcore/InsertRecord.h index a34ecf02eb..e565a1aaca 100644 --- a/internal/core/src/segcore/InsertRecord.h +++ b/internal/core/src/segcore/InsertRecord.h @@ -2,6 +2,7 @@ #include "common/Schema.h" #include "ConcurrentVector.h" #include "AckResponder.h" +#include "segcore/Record.h" namespace milvus::segcore { struct InsertRecord { diff --git a/internal/core/src/segcore/Record.h b/internal/core/src/segcore/Record.h new file mode 100644 index 0000000000..8e35ef4bc3 --- /dev/null +++ b/internal/core/src/segcore/Record.h @@ -0,0 +1,21 @@ +#pragma once +#include "common/Schema.h" + +namespace milvus::segcore { +template +inline int64_t +get_barrier(const RecordType& record, Timestamp timestamp) { + auto& vec = record.timestamps_; + int64_t beg = 0; + int64_t end = record.ack_responder_.GetAck(); + while (beg < end) { + auto mid = (beg + end) / 2; + if (vec[mid] < timestamp) { + beg = mid + 1; + } else { + end = mid; + } + } + return beg; +} +} // namespace milvus::segcore diff --git a/internal/core/src/segcore/SegmentBase.h b/internal/core/src/segcore/SegmentBase.h index faea669804..1e870c8d61 100644 --- a/internal/core/src/segcore/SegmentBase.h +++ b/internal/core/src/segcore/SegmentBase.h @@ -52,10 +52,7 @@ class SegmentBase { virtual Status Delete(int64_t reserved_offset, int64_t size, const int64_t* row_ids, const Timestamp* timestamps) = 0; - // query contains metadata of - virtual Status - QueryDeprecated(query::QueryDeprecatedPtr query, Timestamp timestamp, QueryResult& results) = 0; - + public: virtual Status Search(const query::Plan* Plan, const query::PlaceholderGroup* placeholder_groups[], diff --git a/internal/core/src/segcore/SegmentNaive.cpp b/internal/core/src/segcore/SegmentNaive.cpp index 917b4b524a..eab6b6a508 100644 --- a/internal/core/src/segcore/SegmentNaive.cpp +++ b/internal/core/src/segcore/SegmentNaive.cpp @@ -219,23 +219,6 @@ SegmentNaive::Delete(int64_t reserved_begin, int64_t size, const int64_t* uids_r // return Status::OK(); } -template -int64_t -get_barrier(const RecordType& record, Timestamp timestamp) { - auto& vec = record.timestamps_; - int64_t beg = 0; - int64_t end = record.ack_responder_.GetAck(); - while (beg < end) { - auto mid = (beg + end) / 2; - if (vec[mid] < timestamp) { - beg = mid + 1; - } else { - end = mid; - } - } - return beg; -} - Status SegmentNaive::QueryImpl(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& result) { auto ins_barrier = get_barrier(record_, timestamp); diff --git a/internal/core/src/segcore/SegmentNaive.h b/internal/core/src/segcore/SegmentNaive.h index a4e35afb56..6a4ced97ca 100644 --- a/internal/core/src/segcore/SegmentNaive.h +++ b/internal/core/src/segcore/SegmentNaive.h @@ -41,10 +41,12 @@ class SegmentNaive : public SegmentBase { Status Delete(int64_t reserverd_offset, int64_t size, const int64_t* row_ids, const Timestamp* timestamps) override; - // query contains metadata of + private: + // NOTE: now deprecated, remains for further copy out Status - QueryDeprecated(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& results) override; + QueryDeprecated(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& results); + public: Status Search(const query::Plan* Plan, const query::PlaceholderGroup* placeholder_groups[], diff --git a/internal/core/src/segcore/SegmentSmallIndex.cpp b/internal/core/src/segcore/SegmentSmallIndex.cpp index 47248e75c9..3af53741d6 100644 --- a/internal/core/src/segcore/SegmentSmallIndex.cpp +++ b/internal/core/src/segcore/SegmentSmallIndex.cpp @@ -204,137 +204,6 @@ SegmentSmallIndex::Delete(int64_t reserved_begin, // return Status::OK(); } -template -int64_t -get_barrier(const RecordType& record, Timestamp timestamp) { - auto& vec = record.timestamps_; - int64_t beg = 0; - int64_t end = record.ack_responder_.GetAck(); - while (beg < end) { - auto mid = (beg + end) / 2; - if (vec[mid] < timestamp) { - beg = mid + 1; - } else { - end = mid; - } - } - return beg; -} - -Status -SegmentSmallIndex::QueryBruteForceImpl(const query::QueryInfo& info, - const float* query_data, - int64_t num_queries, - Timestamp timestamp, - QueryResult& results) { - // step 1: binary search to find the barrier of the snapshot - auto ins_barrier = get_barrier(record_, timestamp); - // auto del_barrier = get_barrier(deleted_record_, timestamp); -#if 0 - auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier); - Assert(bitmap_holder); - auto bitmap = bitmap_holder->bitmap_ptr; -#endif - - // step 2.1: get meta - // step 2.2: get which vector field to search - auto vecfield_offset_opt = schema_->get_offset(info.field_id_); - Assert(vecfield_offset_opt.has_value()); - auto vecfield_offset = vecfield_offset_opt.value(); - Assert(vecfield_offset < record_.entity_vec_.size()); - - auto& field = schema_->operator[](vecfield_offset); - auto vec_ptr = std::static_pointer_cast>(record_.entity_vec_.at(vecfield_offset)); - - Assert(field.get_data_type() == DataType::VECTOR_FLOAT); - auto dim = field.get_dim(); - auto topK = info.topK_; - auto total_count = topK * num_queries; - // TODO: optimize - - // step 3: small indexing search - std::vector final_uids(total_count, -1); - std::vector final_dis(total_count, std::numeric_limits::max()); - - auto max_chunk = (ins_barrier + DefaultElementPerChunk - 1) / DefaultElementPerChunk; - - auto max_indexed_id = indexing_record_.get_finished_ack(); - const auto& indexing_entry = indexing_record_.get_indexing(vecfield_offset); - auto search_conf = indexing_entry.get_search_conf(topK); - - for (int chunk_id = 0; chunk_id < max_indexed_id; ++chunk_id) { - auto indexing = indexing_entry.get_indexing(chunk_id); - auto src_data = vec_ptr->get_chunk(chunk_id).data(); - auto dataset = knowhere::GenDataset(num_queries, dim, src_data); - auto ans = indexing->Query(dataset, search_conf, nullptr); - auto dis = ans->Get(milvus::knowhere::meta::DISTANCE); - auto uids = ans->Get(milvus::knowhere::meta::IDS); - merge_into(num_queries, topK, final_dis.data(), final_uids.data(), dis, uids); - } - - // step 4: brute force search where small indexing is unavailable - for (int chunk_id = max_indexed_id; chunk_id < max_chunk; ++chunk_id) { - std::vector buf_uids(total_count, -1); - std::vector buf_dis(total_count, std::numeric_limits::max()); - - faiss::float_maxheap_array_t buf = {(size_t)num_queries, (size_t)topK, buf_uids.data(), buf_dis.data()}; - - auto src_data = vec_ptr->get_chunk(chunk_id).data(); - auto nsize = - chunk_id != max_chunk - 1 ? DefaultElementPerChunk : ins_barrier - chunk_id * DefaultElementPerChunk; - faiss::knn_L2sqr(query_data, src_data, dim, num_queries, nsize, &buf); - merge_into(num_queries, topK, final_dis.data(), final_uids.data(), buf_dis.data(), buf_uids.data()); - } - - // step 5: convert offset to uids - for (auto& id : final_uids) { - if (id == -1) { - continue; - } - id = record_.uids_[id]; - } - - results.result_ids_ = std::move(final_uids); - results.result_distances_ = std::move(final_dis); - results.topK_ = topK; - results.num_queries_ = num_queries; - - // throw std::runtime_error("unimplemented"); - return Status::OK(); -} - -Status -SegmentSmallIndex::QueryDeprecated(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& result) { - // TODO: enable delete - // TODO: enable index - // TODO: remove mock - if (query_info == nullptr) { - query_info = std::make_shared(); - query_info->field_name = "fakevec"; - query_info->topK = 10; - query_info->num_queries = 1; - - auto dim = schema_->operator[]("fakevec").get_dim(); - std::default_random_engine e(42); - std::uniform_real_distribution<> dis(0.0, 1.0); - query_info->query_raw_data.resize(query_info->num_queries * dim); - for (auto& x : query_info->query_raw_data) { - x = dis(e); - } - } - // TODO - query::QueryInfo info{ - query_info->topK, - query_info->field_name, - "L2", - nlohmann::json{ - {"nprobe", 10}, - }, - }; - auto num_queries = query_info->num_queries; - return QueryBruteForceImpl(info, query_info->query_raw_data.data(), num_queries, timestamp, result); -} - Status SegmentSmallIndex::Close() { if (this->record_.reserved != this->record_.ack_responder_.GetAck()) { diff --git a/internal/core/src/segcore/SegmentSmallIndex.h b/internal/core/src/segcore/SegmentSmallIndex.h index 9bb3030e89..70541f8631 100644 --- a/internal/core/src/segcore/SegmentSmallIndex.h +++ b/internal/core/src/segcore/SegmentSmallIndex.h @@ -65,10 +65,6 @@ class SegmentSmallIndex : public SegmentBase { Status Delete(int64_t reserverd_offset, int64_t size, const int64_t* row_ids, const Timestamp* timestamps) override; - // query contains metadata of - Status - QueryDeprecated(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& results) override; - Status Search(const query::Plan* Plan, const query::PlaceholderGroup* placeholder_groups[], @@ -112,6 +108,16 @@ class SegmentSmallIndex : public SegmentBase { return record_; } + const IndexingRecord& + get_indexing_record() const { + return indexing_record_; + } + + const DeletedRecord& + get_deleted_record() const { + return deleted_record_; + } + const Schema& get_schema() const { return *schema_; @@ -148,12 +154,12 @@ class SegmentSmallIndex : public SegmentBase { // Status // QueryBruteForceImpl(query::QueryPtr query, Timestamp timestamp, QueryResult& results); - Status - QueryBruteForceImpl(const query::QueryInfo& info, - const float* query_data, - int64_t num_queries, - Timestamp timestamp, - QueryResult& results); + // Status + // QueryBruteForceImpl(const query::QueryInfo& info, + // const float* query_data, + // int64_t num_queries, + // Timestamp timestamp, + // QueryResult& results); template knowhere::IndexPtr @@ -172,4 +178,5 @@ class SegmentSmallIndex : public SegmentBase { // std::unordered_map indexings_; // index_name => indexing tbb::concurrent_unordered_multimap uid2offset_; }; + } // namespace milvus::segcore diff --git a/internal/master/collection_task_test.go b/internal/master/collection_task_test.go index a48d4e6fa6..d1fa330aed 100644 --- a/internal/master/collection_task_test.go +++ b/internal/master/collection_task_test.go @@ -23,37 +23,44 @@ func TestMaster_CollectionTask(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) defer cancel() - etcdAddr := Params.EtcdAddress() - - etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}}) assert.Nil(t, err) _, err = etcdCli.Delete(ctx, "/test/root", clientv3.WithPrefix()) assert.Nil(t, err) - opt := Option{ - KVRootPath: "/test/root/kv", - MetaRootPath: "/test/root/meta", - EtcdAddr: []string{etcdAddr}, - PulsarAddr: Params.PulsarAddress(), - ProxyIDs: []typeutil.UniqueID{1, 2}, - PulsarProxyChannels: []string{"proxy1", "proxy2"}, - PulsarProxySubName: "proxyTopics", - SoftTTBInterval: 300, - WriteIDs: []typeutil.UniqueID{3, 4}, - PulsarWriteChannels: []string{"write3", "write4"}, - PulsarWriteSubName: "writeTopics", - PulsarDMChannels: []string{"dm0", "dm1"}, - PulsarK2SChannels: []string{"k2s0", "k2s1"}, + Params = ParamTable{ + Address: Params.Address, + Port: Params.Port, + + EtcdAddress: Params.EtcdAddress, + EtcdRootPath: "/test/root", + PulsarAddress: Params.PulsarAddress, + + ProxyIDList: []typeutil.UniqueID{1, 2}, + WriteNodeIDList: []typeutil.UniqueID{3, 4}, + + TopicNum: 5, + QueryNodeNum: 3, + SoftTimeTickBarrierInterval: 300, + + // segment + SegmentSize: 536870912 / 1024 / 1024, + SegmentSizeFactor: 0.75, DefaultRecordSize: 1024, - MinimumAssignSize: 1048576, - SegmentThreshold: 536870912, - SegmentExpireDuration: 2000, - NumOfChannel: 5, - NumOfQueryNode: 3, - StatsChannels: "statistic", + MinSegIDAssignCnt: 1048576 / 1024, + MaxSegIDAssignCnt: Params.MaxSegIDAssignCnt, + SegIDAssignExpiration: 2000, + + // msgChannel + ProxyTimeTickChannelNames: []string{"proxy1", "proxy2"}, + WriteNodeTimeTickChannelNames: []string{"write3", "write4"}, + InsertChannelNames: []string{"dm0", "dm1"}, + K2SChannelNames: []string{"k2s0", "k2s1"}, + QueryNodeStatsChannelName: "statistic", + MsgChannelSubName: Params.MsgChannelSubName, } - svr, err := CreateServer(ctx, &opt) + svr, err := CreateServer(ctx) assert.Nil(t, err) err = svr.Run(10002) assert.Nil(t, err) diff --git a/internal/master/global_allocator_test.go b/internal/master/global_allocator_test.go index 20d4ea4e7b..f33b934dc2 100644 --- a/internal/master/global_allocator_test.go +++ b/internal/master/global_allocator_test.go @@ -15,7 +15,7 @@ var gTestIDAllocator *GlobalIDAllocator func TestMain(m *testing.M) { Params.Init() - etcdAddr := Params.EtcdAddress() + etcdAddr := Params.EtcdAddress gTestTsoAllocator = NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase([]string{etcdAddr}, "/test/root/kv", "tso")) gTestIDAllocator = NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{etcdAddr}, "/test/root/kv", "gid")) exitCode := m.Run() diff --git a/internal/master/grpc_service_test.go b/internal/master/grpc_service_test.go index 72541293ca..a32e23efc8 100644 --- a/internal/master/grpc_service_test.go +++ b/internal/master/grpc_service_test.go @@ -21,36 +21,45 @@ func TestMaster_CreateCollection(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) defer cancel() - etcdAddr := Params.EtcdAddress() + etcdAddr := Params.EtcdAddress etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) assert.Nil(t, err) _, err = etcdCli.Delete(ctx, "/test/root", clientv3.WithPrefix()) assert.Nil(t, err) - opt := Option{ - KVRootPath: "/test/root/kv", - MetaRootPath: "/test/root/meta", - EtcdAddr: []string{etcdAddr}, - PulsarAddr: Params.PulsarAddress(), - ProxyIDs: []typeutil.UniqueID{1, 2}, - PulsarProxyChannels: []string{"proxy1", "proxy2"}, - PulsarProxySubName: "proxyTopics", - SoftTTBInterval: 300, - WriteIDs: []typeutil.UniqueID{3, 4}, - PulsarWriteChannels: []string{"write3", "write4"}, - PulsarWriteSubName: "writeTopics", - PulsarDMChannels: []string{"dm0", "dm1"}, - PulsarK2SChannels: []string{"k2s0", "k2s1"}, + Params = ParamTable{ + Address: Params.Address, + Port: Params.Port, + + EtcdAddress: Params.EtcdAddress, + EtcdRootPath: "/test/root", + PulsarAddress: Params.PulsarAddress, + + ProxyIDList: []typeutil.UniqueID{1, 2}, + WriteNodeIDList: []typeutil.UniqueID{3, 4}, + + TopicNum: 5, + QueryNodeNum: 3, + SoftTimeTickBarrierInterval: 300, + + // segment + SegmentSize: 536870912 / 1024 / 1024, + SegmentSizeFactor: 0.75, DefaultRecordSize: 1024, - MinimumAssignSize: 1048576, - SegmentThreshold: 536870912, - SegmentExpireDuration: 2000, - NumOfChannel: 5, - NumOfQueryNode: 3, - StatsChannels: "statistic", + MinSegIDAssignCnt: 1048576 / 1024, + MaxSegIDAssignCnt: Params.MaxSegIDAssignCnt, + SegIDAssignExpiration: 2000, + + // msgChannel + ProxyTimeTickChannelNames: []string{"proxy1", "proxy2"}, + WriteNodeTimeTickChannelNames: []string{"write3", "write4"}, + InsertChannelNames: []string{"dm0", "dm1"}, + K2SChannelNames: []string{"k2s0", "k2s1"}, + QueryNodeStatsChannelName: "statistic", + MsgChannelSubName: Params.MsgChannelSubName, } - svr, err := CreateServer(ctx, &opt) + svr, err := CreateServer(ctx) assert.Nil(t, err) err = svr.Run(10001) assert.Nil(t, err) diff --git a/internal/master/master.go b/internal/master/master.go index ebf5a69f73..9f5064080a 100644 --- a/internal/master/master.go +++ b/internal/master/master.go @@ -27,36 +27,6 @@ type ( Timestamp = typeutil.Timestamp ) -type Option struct { - KVRootPath string - MetaRootPath string - EtcdAddr []string - - PulsarAddr string - - ////softTimeTickBarrier - ProxyIDs []typeutil.UniqueID - PulsarProxyChannels []string //TimeTick - PulsarProxySubName string - SoftTTBInterval Timestamp //Physical Time + Logical Time - - //hardTimeTickBarrier - WriteIDs []typeutil.UniqueID - PulsarWriteChannels []string - PulsarWriteSubName string - - PulsarDMChannels []string - PulsarK2SChannels []string - - DefaultRecordSize int64 - MinimumAssignSize int64 - SegmentThreshold float64 - SegmentExpireDuration int64 - NumOfChannel int - NumOfQueryNode int - StatsChannels string -} - type Master struct { // Server state. isServing int64 @@ -105,18 +75,22 @@ func newKVBase(kvRoot string, etcdAddr []string) *kv.EtcdKV { func Init() { rand.Seed(time.Now().UnixNano()) - Params.InitParamTable() + Params.Init() } // CreateServer creates the UNINITIALIZED pd server with given configuration. -func CreateServer(ctx context.Context, opt *Option) (*Master, error) { +func CreateServer(ctx context.Context) (*Master, error) { //Init(etcdAddr, kvRootPath) + etcdAddress := Params.EtcdAddress + metaRootPath := Params.EtcdRootPath + kvRootPath := Params.EtcdRootPath + pulsarAddr := Params.PulsarAddress - etcdClient, err := clientv3.New(clientv3.Config{Endpoints: opt.EtcdAddr}) + etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}}) if err != nil { return nil, err } - etcdkv := kv.NewEtcdKV(etcdClient, opt.MetaRootPath) + etcdkv := kv.NewEtcdKV(etcdClient, metaRootPath) metakv, err := NewMetaTable(etcdkv) if err != nil { return nil, err @@ -128,41 +102,41 @@ func CreateServer(ctx context.Context, opt *Option) (*Master, error) { return nil, err } pulsarProxyStream := ms.NewPulsarMsgStream(ctx, 1024) //output stream - pulsarProxyStream.SetPulsarClient(opt.PulsarAddr) - pulsarProxyStream.CreatePulsarConsumers(opt.PulsarProxyChannels, opt.PulsarProxySubName, ms.NewUnmarshalDispatcher(), 1024) + pulsarProxyStream.SetPulsarClient(pulsarAddr) + pulsarProxyStream.CreatePulsarConsumers(Params.ProxyTimeTickChannelNames, Params.MsgChannelSubName, ms.NewUnmarshalDispatcher(), 1024) pulsarProxyStream.Start() var proxyStream ms.MsgStream = pulsarProxyStream - proxyTimeTickBarrier := newSoftTimeTickBarrier(ctx, &proxyStream, opt.ProxyIDs, opt.SoftTTBInterval) + proxyTimeTickBarrier := newSoftTimeTickBarrier(ctx, &proxyStream, Params.ProxyIDList, Params.SoftTimeTickBarrierInterval) tsMsgProducer.SetProxyTtBarrier(proxyTimeTickBarrier) pulsarWriteStream := ms.NewPulsarMsgStream(ctx, 1024) //output stream - pulsarWriteStream.SetPulsarClient(opt.PulsarAddr) - pulsarWriteStream.CreatePulsarConsumers(opt.PulsarWriteChannels, opt.PulsarWriteSubName, ms.NewUnmarshalDispatcher(), 1024) + pulsarWriteStream.SetPulsarClient(pulsarAddr) + pulsarWriteStream.CreatePulsarConsumers(Params.WriteNodeTimeTickChannelNames, Params.MsgChannelSubName, ms.NewUnmarshalDispatcher(), 1024) pulsarWriteStream.Start() var writeStream ms.MsgStream = pulsarWriteStream - writeTimeTickBarrier := newHardTimeTickBarrier(ctx, &writeStream, opt.WriteIDs) + writeTimeTickBarrier := newHardTimeTickBarrier(ctx, &writeStream, Params.WriteNodeIDList) tsMsgProducer.SetWriteNodeTtBarrier(writeTimeTickBarrier) pulsarDMStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream - pulsarDMStream.SetPulsarClient(opt.PulsarAddr) - pulsarDMStream.CreatePulsarProducers(opt.PulsarDMChannels) + pulsarDMStream.SetPulsarClient(pulsarAddr) + pulsarDMStream.CreatePulsarProducers(Params.InsertChannelNames) tsMsgProducer.SetDMSyncStream(pulsarDMStream) pulsarK2SStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream - pulsarK2SStream.SetPulsarClient(opt.PulsarAddr) - pulsarK2SStream.CreatePulsarProducers(opt.PulsarK2SChannels) + pulsarK2SStream.SetPulsarClient(pulsarAddr) + pulsarK2SStream.CreatePulsarProducers(Params.K2SChannelNames) tsMsgProducer.SetK2sSyncStream(pulsarK2SStream) // stats msg stream statsMs := ms.NewPulsarMsgStream(ctx, 1024) - statsMs.SetPulsarClient(opt.PulsarAddr) - statsMs.CreatePulsarConsumers([]string{opt.StatsChannels}, "SegmentStats", ms.NewUnmarshalDispatcher(), 1024) + statsMs.SetPulsarClient(pulsarAddr) + statsMs.CreatePulsarConsumers([]string{Params.QueryNodeStatsChannelName}, "SegmentStats", ms.NewUnmarshalDispatcher(), 1024) statsMs.Start() m := &Master{ ctx: ctx, startTimestamp: time.Now().Unix(), - kvBase: newKVBase(opt.KVRootPath, opt.EtcdAddr), + kvBase: newKVBase(kvRootPath, []string{etcdAddress}), metaTable: metakv, timesSyncMsgProducer: tsMsgProducer, grpcErr: make(chan error), @@ -170,19 +144,19 @@ func CreateServer(ctx context.Context, opt *Option) (*Master, error) { } //init idAllocator - m.idAllocator = NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase(opt.EtcdAddr, opt.KVRootPath, "gid")) + m.idAllocator = NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{etcdAddress}, kvRootPath, "gid")) if err := m.idAllocator.Initialize(); err != nil { return nil, err } //init tsoAllocator - m.tsoAllocator = NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase(opt.EtcdAddr, opt.KVRootPath, "tso")) + m.tsoAllocator = NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase([]string{etcdAddress}, kvRootPath, "tso")) if err := m.tsoAllocator.Initialize(); err != nil { return nil, err } m.scheduler = NewDDRequestScheduler(func() (UniqueID, error) { return m.idAllocator.AllocOne() }) - m.segmentMgr = NewSegmentManager(metakv, opt, + m.segmentMgr = NewSegmentManager(metakv, func() (UniqueID, error) { return m.idAllocator.AllocOne() }, func() (Timestamp, error) { return m.tsoAllocator.AllocOne() }, ) diff --git a/internal/master/meta_table_test.go b/internal/master/meta_table_test.go index f3af52e256..674b016259 100644 --- a/internal/master/meta_table_test.go +++ b/internal/master/meta_table_test.go @@ -15,7 +15,7 @@ import ( func TestMetaTable_Collection(t *testing.T) { Init() - etcdAddr := Params.EtcdAddress() + etcdAddr := Params.EtcdAddress cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) assert.Nil(t, err) etcdKV := kv.NewEtcdKV(cli, "/etcd/test/root") @@ -152,7 +152,7 @@ func TestMetaTable_Collection(t *testing.T) { func TestMetaTable_DeletePartition(t *testing.T) { Init() - etcdAddr := Params.EtcdAddress() + etcdAddr := Params.EtcdAddress cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) assert.Nil(t, err) @@ -243,7 +243,7 @@ func TestMetaTable_DeletePartition(t *testing.T) { func TestMetaTable_Segment(t *testing.T) { Init() - etcdAddr := Params.EtcdAddress() + etcdAddr := Params.EtcdAddress cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) assert.Nil(t, err) @@ -324,7 +324,7 @@ func TestMetaTable_Segment(t *testing.T) { func TestMetaTable_UpdateSegment(t *testing.T) { Init() - etcdAddr := Params.EtcdAddress() + etcdAddr := Params.EtcdAddress cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) assert.Nil(t, err) diff --git a/internal/master/param_table.go b/internal/master/param_table.go new file mode 100644 index 0000000000..4c03e45c9c --- /dev/null +++ b/internal/master/param_table.go @@ -0,0 +1,398 @@ +package master + +import ( + "log" + "strconv" + "strings" + + "github.com/zilliztech/milvus-distributed/internal/util/paramtable" + "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" +) + +type ParamTable struct { + paramtable.BaseTable + + Address string + Port int + + EtcdAddress string + EtcdRootPath string + PulsarAddress string + + // nodeID + ProxyIDList []typeutil.UniqueID + WriteNodeIDList []typeutil.UniqueID + + TopicNum int + QueryNodeNum int + SoftTimeTickBarrierInterval typeutil.Timestamp + + // segment + SegmentSize float64 + SegmentSizeFactor float64 + DefaultRecordSize int64 + MinSegIDAssignCnt int64 + MaxSegIDAssignCnt int64 + SegIDAssignExpiration int64 + + // msgChannel + ProxyTimeTickChannelNames []string + WriteNodeTimeTickChannelNames []string + InsertChannelNames []string + K2SChannelNames []string + QueryNodeStatsChannelName string + MsgChannelSubName string +} + +var Params ParamTable + +func (p *ParamTable) Init() { + // load yaml + p.BaseTable.Init() + err := p.LoadYaml("milvus.yaml") + if err != nil { + panic(err) + } + err = p.LoadYaml("advanced/channel.yaml") + if err != nil { + panic(err) + } + err = p.LoadYaml("advanced/master.yaml") + if err != nil { + panic(err) + } + + // set members + p.initAddress() + p.initPort() + + p.initEtcdAddress() + p.initEtcdRootPath() + p.initPulsarAddress() + + p.initProxyIDList() + p.initWriteNodeIDList() + + p.initTopicNum() + p.initQueryNodeNum() + p.initSoftTimeTickBarrierInterval() + + p.initSegmentSize() + p.initSegmentSizeFactor() + p.initDefaultRecordSize() + p.initMinSegIDAssignCnt() + p.initMaxSegIDAssignCnt() + p.initSegIDAssignExpiration() + + p.initProxyTimeTickChannelNames() + p.initWriteNodeTimeTickChannelNames() + p.initInsertChannelNames() + p.initK2SChannelNames() + p.initQueryNodeStatsChannelName() + p.initMsgChannelSubName() +} + +func (p *ParamTable) initAddress() { + masterAddress, err := p.Load("master.address") + if err != nil { + panic(err) + } + p.Address = masterAddress +} + +func (p *ParamTable) initPort() { + masterPort, err := p.Load("master.port") + if err != nil { + panic(err) + } + port, err := strconv.Atoi(masterPort) + if err != nil { + panic(err) + } + p.Port = port +} + +func (p *ParamTable) initEtcdAddress() { + addr, err := p.Load("_EtcdAddress") + if err != nil { + panic(err) + } + p.EtcdAddress = addr +} + +func (p *ParamTable) initPulsarAddress() { + addr, err := p.Load("_PulsarAddress") + if err != nil { + panic(err) + } + p.PulsarAddress = addr +} + +func (p *ParamTable) initEtcdRootPath() { + path, err := p.Load("etcd.rootpath") + if err != nil { + panic(err) + } + p.EtcdRootPath = path +} + +func (p *ParamTable) initTopicNum() { + insertChannelRange, err := p.Load("msgChannel.channelRange.insert") + if err != nil { + panic(err) + } + + channelRange := strings.Split(insertChannelRange, ",") + if len(channelRange) != 2 { + panic("Illegal channel range num") + } + channelBegin, err := strconv.Atoi(channelRange[0]) + if err != nil { + panic(err) + } + channelEnd, err := strconv.Atoi(channelRange[1]) + if err != nil { + panic(err) + } + if channelBegin < 0 || channelEnd < 0 { + panic("Illegal channel range value") + } + if channelBegin > channelEnd { + panic("Illegal channel range value") + } + p.TopicNum = channelEnd +} + +func (p *ParamTable) initSegmentSize() { + threshold, err := p.Load("master.segment.size") + if err != nil { + panic(err) + } + segmentThreshold, err := strconv.ParseFloat(threshold, 64) + if err != nil { + panic(err) + } + p.SegmentSize = segmentThreshold +} + +func (p *ParamTable) initSegmentSizeFactor() { + segFactor, err := p.Load("master.segment.sizeFactor") + if err != nil { + panic(err) + } + factor, err := strconv.ParseFloat(segFactor, 64) + if err != nil { + panic(err) + } + p.SegmentSizeFactor = factor +} + +func (p *ParamTable) initDefaultRecordSize() { + size, err := p.Load("master.segment.defaultSizePerRecord") + if err != nil { + panic(err) + } + res, err := strconv.ParseInt(size, 10, 64) + if err != nil { + panic(err) + } + p.DefaultRecordSize = res +} + +func (p *ParamTable) initMinSegIDAssignCnt() { + size, err := p.Load("master.segment.minIDAssignCnt") + if err != nil { + panic(err) + } + res, err := strconv.ParseInt(size, 10, 64) + if err != nil { + panic(err) + } + p.MinSegIDAssignCnt = res +} + +func (p *ParamTable) initMaxSegIDAssignCnt() { + size, err := p.Load("master.segment.maxIDAssignCnt") + if err != nil { + panic(err) + } + res, err := strconv.ParseInt(size, 10, 64) + if err != nil { + panic(err) + } + p.MaxSegIDAssignCnt = res +} + +func (p *ParamTable) initSegIDAssignExpiration() { + duration, err := p.Load("master.segment.IDAssignExpiration") + if err != nil { + panic(err) + } + res, err := strconv.ParseInt(duration, 10, 64) + if err != nil { + panic(err) + } + p.SegIDAssignExpiration = res +} + +func (p *ParamTable) initQueryNodeNum() { + id, err := p.Load("nodeID.queryNodeIDList") + if err != nil { + panic(err) + } + ids := strings.Split(id, ",") + for _, i := range ids { + _, err := strconv.ParseInt(i, 10, 64) + if err != nil { + log.Panicf("load proxy id list error, %s", err.Error()) + } + } + p.QueryNodeNum = len(ids) +} + +func (p *ParamTable) initQueryNodeStatsChannelName() { + channels, err := p.Load("msgChannel.chanNamePrefix.queryNodeStats") + if err != nil { + panic(err) + } + p.QueryNodeStatsChannelName = channels +} + +func (p *ParamTable) initProxyIDList() { + id, err := p.Load("nodeID.proxyIDList") + if err != nil { + log.Panicf("load proxy id list error, %s", err.Error()) + } + ids := strings.Split(id, ",") + idList := make([]typeutil.UniqueID, 0, len(ids)) + for _, i := range ids { + v, err := strconv.ParseInt(i, 10, 64) + if err != nil { + log.Panicf("load proxy id list error, %s", err.Error()) + } + idList = append(idList, typeutil.UniqueID(v)) + } + p.ProxyIDList = idList +} + +func (p *ParamTable) initProxyTimeTickChannelNames() { + ch, err := p.Load("msgChannel.chanNamePrefix.proxyTimeTick") + if err != nil { + log.Panic(err) + } + id, err := p.Load("nodeID.proxyIDList") + if err != nil { + log.Panicf("load proxy id list error, %s", err.Error()) + } + ids := strings.Split(id, ",") + channels := make([]string, 0, len(ids)) + for _, i := range ids { + _, err := strconv.ParseInt(i, 10, 64) + if err != nil { + log.Panicf("load proxy id list error, %s", err.Error()) + } + channels = append(channels, ch+"-"+i) + } + p.ProxyTimeTickChannelNames = channels +} + +func (p *ParamTable) initMsgChannelSubName() { + name, err := p.Load("msgChannel.subNamePrefix.masterSubNamePrefix") + if err != nil { + log.Panic(err) + } + p.MsgChannelSubName = name +} + +func (p *ParamTable) initSoftTimeTickBarrierInterval() { + t, err := p.Load("master.timeSync.softTimeTickBarrierInterval") + if err != nil { + log.Panic(err) + } + v, err := strconv.ParseInt(t, 10, 64) + if err != nil { + log.Panic(err) + } + p.SoftTimeTickBarrierInterval = tsoutil.ComposeTS(v, 0) +} + +func (p *ParamTable) initWriteNodeIDList() { + id, err := p.Load("nodeID.writeNodeIDList") + if err != nil { + log.Panic(err) + } + ids := strings.Split(id, ",") + idlist := make([]typeutil.UniqueID, 0, len(ids)) + for _, i := range ids { + v, err := strconv.ParseInt(i, 10, 64) + if err != nil { + log.Panicf("load proxy id list error, %s", err.Error()) + } + idlist = append(idlist, typeutil.UniqueID(v)) + } + p.WriteNodeIDList = idlist +} + +func (p *ParamTable) initWriteNodeTimeTickChannelNames() { + ch, err := p.Load("msgChannel.chanNamePrefix.writeNodeTimeTick") + if err != nil { + log.Fatal(err) + } + id, err := p.Load("nodeID.writeNodeIDList") + if err != nil { + log.Panicf("load write node id list error, %s", err.Error()) + } + ids := strings.Split(id, ",") + channels := make([]string, 0, len(ids)) + for _, i := range ids { + _, err := strconv.ParseInt(i, 10, 64) + if err != nil { + log.Panicf("load write node id list error, %s", err.Error()) + } + channels = append(channels, ch+"-"+i) + } + p.WriteNodeTimeTickChannelNames = channels +} + +func (p *ParamTable) initInsertChannelNames() { + ch, err := p.Load("msgChannel.chanNamePrefix.insert") + if err != nil { + log.Fatal(err) + } + id, err := p.Load("nodeID.queryNodeIDList") + if err != nil { + log.Panicf("load query node id list error, %s", err.Error()) + } + ids := strings.Split(id, ",") + channels := make([]string, 0, len(ids)) + for _, i := range ids { + _, err := strconv.ParseInt(i, 10, 64) + if err != nil { + log.Panicf("load query node id list error, %s", err.Error()) + } + channels = append(channels, ch+"-"+i) + } + p.InsertChannelNames = channels +} + +func (p *ParamTable) initK2SChannelNames() { + ch, err := p.Load("msgChannel.chanNamePrefix.k2s") + if err != nil { + log.Fatal(err) + } + id, err := p.Load("nodeID.writeNodeIDList") + if err != nil { + log.Panicf("load write node id list error, %s", err.Error()) + } + ids := strings.Split(id, ",") + channels := make([]string, 0, len(ids)) + for _, i := range ids { + _, err := strconv.ParseInt(i, 10, 64) + if err != nil { + log.Panicf("load write node id list error, %s", err.Error()) + } + channels = append(channels, ch+"-"+i) + } + p.K2SChannelNames = channels +} diff --git a/internal/master/param_table_test.go b/internal/master/param_table_test.go new file mode 100644 index 0000000000..8940e961c0 --- /dev/null +++ b/internal/master/param_table_test.go @@ -0,0 +1,143 @@ +package master + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestParamTable_Init(t *testing.T) { + Params.Init() +} + +func TestParamTable_Address(t *testing.T) { + Params.Init() + address := Params.Address + assert.Equal(t, address, "localhost") +} + +func TestParamTable_Port(t *testing.T) { + Params.Init() + port := Params.Port + assert.Equal(t, port, 53100) +} + +func TestParamTable_EtcdRootPath(t *testing.T) { + Params.Init() + addr := Params.EtcdRootPath + assert.Equal(t, addr, "by-dev") +} + +func TestParamTable_TopicNum(t *testing.T) { + Params.Init() + num := Params.TopicNum + assert.Equal(t, num, 15) +} + +func TestParamTable_SegmentSize(t *testing.T) { + Params.Init() + size := Params.SegmentSize + assert.Equal(t, size, float64(512)) +} + +func TestParamTable_SegmentSizeFactor(t *testing.T) { + Params.Init() + factor := Params.SegmentSizeFactor + assert.Equal(t, factor, 0.75) +} + +func TestParamTable_DefaultRecordSize(t *testing.T) { + Params.Init() + size := Params.DefaultRecordSize + assert.Equal(t, size, int64(1024)) +} + +func TestParamTable_MinSegIDAssignCnt(t *testing.T) { + Params.Init() + cnt := Params.MinSegIDAssignCnt + assert.Equal(t, cnt, int64(1024)) +} + +func TestParamTable_MaxSegIDAssignCnt(t *testing.T) { + Params.Init() + cnt := Params.MaxSegIDAssignCnt + assert.Equal(t, cnt, int64(16384)) +} + +func TestParamTable_SegIDAssignExpiration(t *testing.T) { + Params.Init() + expiration := Params.SegIDAssignExpiration + assert.Equal(t, expiration, int64(2000)) +} + +func TestParamTable_QueryNodeNum(t *testing.T) { + Params.Init() + num := Params.QueryNodeNum + assert.Equal(t, num, 2) +} + +func TestParamTable_QueryNodeStatsChannelName(t *testing.T) { + Params.Init() + name := Params.QueryNodeStatsChannelName + assert.Equal(t, name, "query-node-stats") +} + +func TestParamTable_ProxyIDList(t *testing.T) { + Params.Init() + ids := Params.ProxyIDList + assert.Equal(t, len(ids), 2) + assert.Equal(t, ids[0], int64(1)) + assert.Equal(t, ids[1], int64(2)) +} + +func TestParamTable_ProxyTimeTickChannelNames(t *testing.T) { + Params.Init() + names := Params.ProxyTimeTickChannelNames + assert.Equal(t, len(names), 2) + assert.Equal(t, names[0], "proxyTimeTick-1") + assert.Equal(t, names[1], "proxyTimeTick-2") +} + +func TestParamTable_MsgChannelSubName(t *testing.T) { + Params.Init() + name := Params.MsgChannelSubName + assert.Equal(t, name, "master") +} + +func TestParamTable_SoftTimeTickBarrierInterval(t *testing.T) { + Params.Init() + interval := Params.SoftTimeTickBarrierInterval + assert.Equal(t, interval, Timestamp(0x7d00000)) +} + +func TestParamTable_WriteNodeIDList(t *testing.T) { + Params.Init() + ids := Params.WriteNodeIDList + assert.Equal(t, len(ids), 2) + assert.Equal(t, ids[0], int64(5)) + assert.Equal(t, ids[1], int64(6)) +} + +func TestParamTable_WriteNodeTimeTickChannelNames(t *testing.T) { + Params.Init() + names := Params.WriteNodeTimeTickChannelNames + assert.Equal(t, len(names), 2) + assert.Equal(t, names[0], "writeNodeTimeTick-5") + assert.Equal(t, names[1], "writeNodeTimeTick-6") +} + +func TestParamTable_InsertChannelNames(t *testing.T) { + Params.Init() + names := Params.InsertChannelNames + assert.Equal(t, len(names), 2) + assert.Equal(t, names[0], "insert-3") + assert.Equal(t, names[1], "insert-4") +} + +func TestParamTable_K2SChannelNames(t *testing.T) { + Params.Init() + names := Params.K2SChannelNames + assert.Equal(t, len(names), 2) + assert.Equal(t, names[0], "k2s-5") + assert.Equal(t, names[1], "k2s-6") +} diff --git a/internal/master/paramtable.go b/internal/master/paramtable.go deleted file mode 100644 index efddbb012f..0000000000 --- a/internal/master/paramtable.go +++ /dev/null @@ -1,216 +0,0 @@ -package master - -import ( - "log" - "strconv" - "strings" - - "github.com/zilliztech/milvus-distributed/internal/util/paramtable" - "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" -) - -type ParamTable struct { - paramtable.BaseTable -} - -var Params ParamTable - -func (p *ParamTable) InitParamTable() { - p.Init() -} - -func (p *ParamTable) Address() string { - masterAddress, _ := p.Load("master.address") - return masterAddress -} - -func (p *ParamTable) Port() int { - masterPort, _ := p.Load("master.port") - port, err := strconv.Atoi(masterPort) - if err != nil { - panic(err) - } - return port -} - -func (p *ParamTable) PulsarToic() string { - pulsarTopic, _ := p.Load("master.pulsartopic") - return pulsarTopic -} - -func (p *ParamTable) SegmentThreshold() float64 { - threshold, _ := p.Load("master.segmentThreshold") - segmentThreshold, err := strconv.ParseFloat(threshold, 32) - if err != nil { - panic(err) - } - return segmentThreshold -} - -func (p *ParamTable) DefaultRecordSize() int64 { - size, _ := p.Load("master.defaultSizePerRecord") - res, err := strconv.ParseInt(size, 10, 64) - if err != nil { - panic(err) - } - return res -} - -func (p *ParamTable) MinimumAssignSize() int64 { - size, _ := p.Load("master.minimumAssignSize") - res, err := strconv.ParseInt(size, 10, 64) - if err != nil { - panic(err) - } - return res -} - -func (p *ParamTable) SegmentExpireDuration() int64 { - duration, _ := p.Load("master.segmentExpireDuration") - res, err := strconv.ParseInt(duration, 10, 64) - if err != nil { - panic(err) - } - return res -} - -func (p *ParamTable) QueryNodeNum() (int, error) { - num, _ := p.Load("master.querynodenum") - return strconv.Atoi(num) -} - -func (p *ParamTable) StatsChannels() string { - channels, _ := p.Load("master.statsChannels") - return channels -} - -func (p *ParamTable) ProxyIDList() []typeutil.UniqueID { - id, err := p.Load("master.proxyidlist") - if err != nil { - log.Panicf("load proxy id list error, %s", err.Error()) - } - ids := strings.Split(id, ",") - idlist := make([]typeutil.UniqueID, 0, len(ids)) - for _, i := range ids { - v, err := strconv.ParseInt(i, 10, 64) - if err != nil { - log.Panicf("load proxy id list error, %s", err.Error()) - } - idlist = append(idlist, typeutil.UniqueID(v)) - } - return idlist -} - -func (p *ParamTable) ProxyTimeSyncChannels() []string { - chs, err := p.Load("master.proxyTimeSyncChannels") - if err != nil { - log.Panic(err) - } - return strings.Split(chs, ",") -} - -func (p *ParamTable) ProxyTimeSyncSubName() string { - name, err := p.Load("master.proxyTimeSyncSubName") - if err != nil { - log.Panic(err) - } - return name -} - -func (p *ParamTable) SoftTimeTickBarrierInterval() typeutil.Timestamp { - t, err := p.Load("master.softTimeTickBarrierInterval") - if err != nil { - log.Panic(err) - } - v, err := strconv.ParseInt(t, 10, 64) - if err != nil { - log.Panic(err) - } - return tsoutil.ComposeTS(v, 0) -} - -func (p *ParamTable) WriteIDList() []typeutil.UniqueID { - id, err := p.Load("master.writeidlist") - if err != nil { - log.Panic(err) - } - ids := strings.Split(id, ",") - idlist := make([]typeutil.UniqueID, 0, len(ids)) - for _, i := range ids { - v, err := strconv.ParseInt(i, 10, 64) - if err != nil { - log.Panicf("load proxy id list error, %s", err.Error()) - } - idlist = append(idlist, typeutil.UniqueID(v)) - } - return idlist -} - -func (p *ParamTable) WriteTimeSyncChannels() []string { - chs, err := p.Load("master.writeTimeSyncChannels") - if err != nil { - log.Fatal(err) - } - return strings.Split(chs, ",") -} - -func (p *ParamTable) WriteTimeSyncSubName() string { - name, err := p.Load("master.writeTimeSyncSubName") - if err != nil { - log.Fatal(err) - } - return name -} - -func (p *ParamTable) DMTimeSyncChannels() []string { - chs, err := p.Load("master.dmTimeSyncChannels") - if err != nil { - log.Fatal(err) - } - return strings.Split(chs, ",") -} - -func (p *ParamTable) K2STimeSyncChannels() []string { - chs, err := p.Load("master.k2sTimeSyncChannels") - if err != nil { - log.Fatal(err) - } - return strings.Split(chs, ",") -} - -func (p *ParamTable) PulsarAddress() string { - pulsarAddress, err := p.Load("_PulsarAddress") - if err != nil { - panic(err) - } - return pulsarAddress -} - -func (p *ParamTable) EtcdAddress() string { - etcdAddress, err := p.Load("_EtcdAddress") - if err != nil { - panic(err) - } - return etcdAddress -} - -func (p *ParamTable) EtcdRootPath() string { - etcdRootPath, err := p.Load("etcd.rootpath") - if err != nil { - panic(err) - } - return etcdRootPath -} - -func (p *ParamTable) TopicNum() int { - topicNum, err := p.Load("pulsar.topicnum") - if err != nil { - panic(err) - } - num, err := strconv.Atoi(topicNum) - if err != nil { - panic(err) - } - return num -} diff --git a/internal/master/partition_task_test.go b/internal/master/partition_task_test.go index b861bc15d6..cd784f9780 100644 --- a/internal/master/partition_task_test.go +++ b/internal/master/partition_task_test.go @@ -24,37 +24,46 @@ func TestMaster_Partition(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) defer cancel() - etcdAddr := Params.EtcdAddress() + etcdAddr := Params.EtcdAddress etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) assert.Nil(t, err) _, err = etcdCli.Delete(ctx, "/test/root", clientv3.WithPrefix()) assert.Nil(t, err) - opt := Option{ - KVRootPath: "/test/root/kv", - MetaRootPath: "/test/root/meta", - EtcdAddr: []string{etcdAddr}, - PulsarAddr: Params.PulsarAddress(), - ProxyIDs: []typeutil.UniqueID{1, 2}, - PulsarProxyChannels: []string{"proxy1", "proxy2"}, - PulsarProxySubName: "proxyTopics", - SoftTTBInterval: 300, - WriteIDs: []typeutil.UniqueID{3, 4}, - PulsarWriteChannels: []string{"write3", "write4"}, - PulsarWriteSubName: "writeTopics", - PulsarDMChannels: []string{"dm0", "dm1"}, - PulsarK2SChannels: []string{"k2s0", "k2s1"}, + Params = ParamTable{ + Address: Params.Address, + Port: Params.Port, + + EtcdAddress: Params.EtcdAddress, + EtcdRootPath: "/test/root", + PulsarAddress: Params.PulsarAddress, + + ProxyIDList: []typeutil.UniqueID{1, 2}, + WriteNodeIDList: []typeutil.UniqueID{3, 4}, + + TopicNum: 5, + QueryNodeNum: 3, + SoftTimeTickBarrierInterval: 300, + + // segment + SegmentSize: 536870912 / 1024 / 1024, + SegmentSizeFactor: 0.75, DefaultRecordSize: 1024, - MinimumAssignSize: 1048576, - SegmentThreshold: 536870912, - SegmentExpireDuration: 2000, - NumOfChannel: 5, - NumOfQueryNode: 3, - StatsChannels: "statistic", + MinSegIDAssignCnt: 1048576 / 1024, + MaxSegIDAssignCnt: Params.MaxSegIDAssignCnt, + SegIDAssignExpiration: 2000, + + // msgChannel + ProxyTimeTickChannelNames: []string{"proxy1", "proxy2"}, + WriteNodeTimeTickChannelNames: []string{"write3", "write4"}, + InsertChannelNames: []string{"dm0", "dm1"}, + K2SChannelNames: []string{"k2s0", "k2s1"}, + QueryNodeStatsChannelName: "statistic", + MsgChannelSubName: Params.MsgChannelSubName, } port := 10000 + rand.Intn(1000) - svr, err := CreateServer(ctx, &opt) + svr, err := CreateServer(ctx) assert.Nil(t, err) err = svr.Run(int64(port)) assert.Nil(t, err) diff --git a/internal/master/segment_manager.go b/internal/master/segment_manager.go index 0c45405685..c9d6b846c0 100644 --- a/internal/master/segment_manager.go +++ b/internal/master/segment_manager.go @@ -29,20 +29,21 @@ type segmentStatus struct { } type SegmentManager struct { - metaTable *metaTable - statsStream msgstream.MsgStream - channelRanges []*channelRange - segmentStatus map[UniqueID]*segmentStatus // segment id to segment status - collStatus map[UniqueID]*collectionStatus // collection id to collection status - defaultSizePerRecord int64 - minimumAssignSize int64 - segmentThreshold int64 - segmentExpireDuration int64 - numOfChannels int - numOfQueryNodes int - globalIDAllocator func() (UniqueID, error) - globalTSOAllocator func() (Timestamp, error) - mu sync.RWMutex + metaTable *metaTable + statsStream msgstream.MsgStream + channelRanges []*channelRange + segmentStatus map[UniqueID]*segmentStatus // segment id to segment status + collStatus map[UniqueID]*collectionStatus // collection id to collection status + defaultSizePerRecord int64 + minimumAssignSize int64 + segmentThreshold float64 + segmentThresholdFactor float64 + segmentExpireDuration int64 + numOfChannels int + numOfQueryNodes int + globalIDAllocator func() (UniqueID, error) + globalTSOAllocator func() (Timestamp, error) + mu sync.RWMutex } func (segMgr *SegmentManager) HandleQueryNodeMsgPack(msgPack *msgstream.MsgPack) error { @@ -76,7 +77,7 @@ func (segMgr *SegmentManager) handleSegmentStat(segStats *internalpb.SegmentStat segMeta.NumRows = segStats.NumRows segMeta.MemSize = segStats.MemorySize - if segStats.MemorySize > segMgr.segmentThreshold { + if segStats.MemorySize > int64(segMgr.segmentThresholdFactor*segMgr.segmentThreshold) { return segMgr.closeSegment(segMeta) } return segMgr.metaTable.UpdateSegment(segMeta) @@ -150,6 +151,7 @@ func (segMgr *SegmentManager) AssignSegmentID(segIDReq []*internalpb.SegIDReques func (segMgr *SegmentManager) assignSegment(collName string, collID UniqueID, partitionTag string, count uint32, channelID int32, collStatus *collectionStatus) (*internalpb.SegIDAssignment, error) { + segmentThreshold := int64(segMgr.segmentThreshold) for _, segID := range collStatus.openedSegments { segMeta, _ := segMgr.metaTable.GetSegmentByID(segID) if segMeta.GetCloseTime() != 0 || channelID < segMeta.GetChannelStart() || @@ -160,8 +162,8 @@ func (segMgr *SegmentManager) assignSegment(collName string, collID UniqueID, pa assignedMem := segMgr.checkAssignedSegExpire(segID) memSize := segMeta.MemSize neededMemSize := segMgr.calNeededSize(memSize, segMeta.NumRows, int64(count)) - if memSize+assignedMem+neededMemSize <= segMgr.segmentThreshold { - remainingSize := segMgr.segmentThreshold - memSize - assignedMem + if memSize+assignedMem+neededMemSize <= segmentThreshold { + remainingSize := segmentThreshold - memSize - assignedMem allocMemSize := segMgr.calAllocMemSize(neededMemSize, remainingSize) segMgr.addAssignment(segID, allocMemSize) return &internalpb.SegIDAssignment{ @@ -174,7 +176,7 @@ func (segMgr *SegmentManager) assignSegment(collName string, collID UniqueID, pa } } neededMemSize := segMgr.defaultSizePerRecord * int64(count) - if neededMemSize > segMgr.segmentThreshold { + if neededMemSize > segmentThreshold { return nil, errors.Errorf("request with count %d need about %d mem size which is larger than segment threshold", count, neededMemSize) } @@ -184,7 +186,7 @@ func (segMgr *SegmentManager) assignSegment(collName string, collID UniqueID, pa return nil, err } - allocMemSize := segMgr.calAllocMemSize(neededMemSize, segMgr.segmentThreshold) + allocMemSize := segMgr.calAllocMemSize(neededMemSize, segmentThreshold) segMgr.addAssignment(segMeta.SegmentID, allocMemSize) return &internalpb.SegIDAssignment{ SegID: segMeta.SegmentID, @@ -322,23 +324,23 @@ func (segMgr *SegmentManager) createChannelRanges() error { } func NewSegmentManager(meta *metaTable, - opt *Option, globalIDAllocator func() (UniqueID, error), globalTSOAllocator func() (Timestamp, error), ) *SegmentManager { segMgr := &SegmentManager{ - metaTable: meta, - channelRanges: make([]*channelRange, 0), - segmentStatus: make(map[UniqueID]*segmentStatus), - collStatus: make(map[UniqueID]*collectionStatus), - segmentThreshold: int64(opt.SegmentThreshold), - segmentExpireDuration: opt.SegmentExpireDuration, - minimumAssignSize: opt.MinimumAssignSize, - defaultSizePerRecord: opt.DefaultRecordSize, - numOfChannels: opt.NumOfChannel, - numOfQueryNodes: opt.NumOfQueryNode, - globalIDAllocator: globalIDAllocator, - globalTSOAllocator: globalTSOAllocator, + metaTable: meta, + channelRanges: make([]*channelRange, 0), + segmentStatus: make(map[UniqueID]*segmentStatus), + collStatus: make(map[UniqueID]*collectionStatus), + segmentThreshold: Params.SegmentSize * 1024 * 1024, + segmentThresholdFactor: Params.SegmentSizeFactor, + segmentExpireDuration: Params.SegIDAssignExpiration, + minimumAssignSize: Params.MinSegIDAssignCnt * Params.DefaultRecordSize, + defaultSizePerRecord: Params.DefaultRecordSize, + numOfChannels: Params.TopicNum, + numOfQueryNodes: Params.QueryNodeNum, + globalIDAllocator: globalIDAllocator, + globalTSOAllocator: globalTSOAllocator, } segMgr.createChannelRanges() return segMgr diff --git a/internal/master/segment_manager_test.go b/internal/master/segment_manager_test.go index f6556c155e..7063647be7 100644 --- a/internal/master/segment_manager_test.go +++ b/internal/master/segment_manager_test.go @@ -7,22 +7,21 @@ import ( "testing" "time" - "github.com/gogo/protobuf/proto" - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" - "github.com/zilliztech/milvus-distributed/internal/proto/servicepb" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" - "google.golang.org/grpc" - + "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/kv" "github.com/zilliztech/milvus-distributed/internal/msgstream" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" pb "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" + "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" + "github.com/zilliztech/milvus-distributed/internal/proto/servicepb" "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" "go.etcd.io/etcd/clientv3" + "google.golang.org/grpc" ) var mt *metaTable @@ -36,7 +35,7 @@ var masterCancelFunc context.CancelFunc func setup() { Params.Init() - etcdAddress := Params.EtcdAddress() + etcdAddress := Params.EtcdAddress cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}}) if err != nil { @@ -77,18 +76,18 @@ func setup() { if err != nil { panic(err) } - opt := &Option{ - SegmentThreshold: 536870912, - SegmentExpireDuration: 2000, - MinimumAssignSize: 1048576, - DefaultRecordSize: 1024, - NumOfQueryNode: 3, - NumOfChannel: 5, - } var cnt int64 - segMgr = NewSegmentManager(mt, opt, + Params.TopicNum = 5 + Params.QueryNodeNum = 3 + Params.SegmentSize = 536870912 / 1024 / 1024 + Params.SegmentSizeFactor = 0.75 + Params.DefaultRecordSize = 1024 + Params.MinSegIDAssignCnt = 1048576 / 1024 + Params.SegIDAssignExpiration = 2000 + + segMgr = NewSegmentManager(mt, func() (UniqueID, error) { val := atomic.AddInt64(&cnt, 1) return val, nil @@ -209,7 +208,7 @@ func TestSegmentManager_SegmentStats(t *testing.T) { // close segment stats.SegStats[0].NumRows = 600000 - stats.SegStats[0].MemorySize = 600000000 + stats.SegStats[0].MemorySize = int64(0.8 * segMgr.segmentThreshold) err = segMgr.HandleQueryNodeMsgPack(&msgPack) assert.Nil(t, err) segMeta, _ = mt.GetSegmentByID(100) @@ -219,7 +218,7 @@ func TestSegmentManager_SegmentStats(t *testing.T) { func startupMaster() { Params.Init() - etcdAddress := Params.EtcdAddress() + etcdAddress := Params.EtcdAddress rootPath := "/test/root" ctx, cancel := context.WithCancel(context.TODO()) masterCancelFunc = cancel @@ -231,32 +230,40 @@ func startupMaster() { if err != nil { panic(err) } - pulsarAddress := Params.PulsarAddress() - opt := &Option{ - KVRootPath: "/test/root/kv", - MetaRootPath: "/test/root/meta", - EtcdAddr: []string{etcdAddress}, - PulsarAddr: pulsarAddress, - ProxyIDs: []typeutil.UniqueID{1, 2}, - PulsarProxyChannels: []string{"proxy1", "proxy2"}, - PulsarProxySubName: "proxyTopics", - SoftTTBInterval: 300, - WriteIDs: []typeutil.UniqueID{3, 4}, - PulsarWriteChannels: []string{"write3", "write4"}, - PulsarWriteSubName: "writeTopics", - PulsarDMChannels: []string{"dm0", "dm1"}, - PulsarK2SChannels: []string{"k2s0", "k2s1"}, + Params = ParamTable{ + Address: Params.Address, + Port: Params.Port, + + EtcdAddress: Params.EtcdAddress, + EtcdRootPath: rootPath, + PulsarAddress: Params.PulsarAddress, + + ProxyIDList: []typeutil.UniqueID{1, 2}, + WriteNodeIDList: []typeutil.UniqueID{3, 4}, + + TopicNum: 5, + QueryNodeNum: 3, + SoftTimeTickBarrierInterval: 300, + + // segment + SegmentSize: 536870912 / 1024 / 1024, + SegmentSizeFactor: 0.75, DefaultRecordSize: 1024, - MinimumAssignSize: 1048576, - SegmentThreshold: 536870912, - SegmentExpireDuration: 2000, - NumOfChannel: 5, - NumOfQueryNode: 3, - StatsChannels: "statistic", + MinSegIDAssignCnt: 1048576 / 1024, + MaxSegIDAssignCnt: Params.MaxSegIDAssignCnt, + SegIDAssignExpiration: 2000, + + // msgChannel + ProxyTimeTickChannelNames: []string{"proxy1", "proxy2"}, + WriteNodeTimeTickChannelNames: []string{"write3", "write4"}, + InsertChannelNames: []string{"dm0", "dm1"}, + K2SChannelNames: []string{"k2s0", "k2s1"}, + QueryNodeStatsChannelName: "statistic", + MsgChannelSubName: Params.MsgChannelSubName, } - master, err = CreateServer(ctx, opt) + master, err = CreateServer(ctx) if err != nil { panic(err) } @@ -327,7 +334,7 @@ func TestSegmentManager_RPC(t *testing.T) { // test stats segID := assignments[0].SegID - pulsarAddress := Params.PulsarAddress() + pulsarAddress := Params.PulsarAddress ms := msgstream.NewPulsarMsgStream(ctx, 1024) ms.SetPulsarClient(pulsarAddress) ms.CreatePulsarProducers([]string{"statistic"}) diff --git a/internal/master/time_snyc_producer_test.go b/internal/master/time_snyc_producer_test.go index 340106690f..4bb98f5129 100644 --- a/internal/master/time_snyc_producer_test.go +++ b/internal/master/time_snyc_producer_test.go @@ -80,7 +80,7 @@ func receiveMsg(stream *ms.MsgStream) []uint64 { func TestStream_PulsarMsgStream_TimeTick(t *testing.T) { Init() - pulsarAddress := Params.PulsarAddress() + pulsarAddress := Params.PulsarAddress producerChannels := []string{"proxyTtBarrier"} consumerChannels := []string{"proxyTtBarrier"} diff --git a/internal/master/timesync_test.go b/internal/master/timesync_test.go index d89b1df224..a0cdb2cef5 100644 --- a/internal/master/timesync_test.go +++ b/internal/master/timesync_test.go @@ -68,7 +68,7 @@ func getEmptyMsgPack() *ms.MsgPack { func producer(channels []string, ttmsgs [][2]int) (*ms.MsgStream, *ms.MsgStream) { Init() - pulsarAddress := Params.PulsarAddress() + pulsarAddress := Params.PulsarAddress consumerSubName := "subTimetick" producerChannels := channels consumerChannels := channels diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index f17ba0bc2c..1c324d2fd1 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -5,7 +5,6 @@ import ( "fmt" "log" "os" - "path" "strconv" "sync" "testing" @@ -20,7 +19,6 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "github.com/zilliztech/milvus-distributed/internal/proto/servicepb" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" "go.etcd.io/etcd/clientv3" ) @@ -38,10 +36,8 @@ var testNum = 10 func startMaster(ctx context.Context) { master.Init() - etcdAddr := master.Params.EtcdAddress() - rootPath := master.Params.EtcdRootPath() - kvRootPath := path.Join(rootPath, "kv") - metaRootPath := path.Join(rootPath, "meta") + etcdAddr := master.Params.EtcdAddress + rootPath := master.Params.EtcdRootPath etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) if err != nil { @@ -52,35 +48,12 @@ func startMaster(ctx context.Context) { panic(err) } - opt := master.Option{ - KVRootPath: kvRootPath, - MetaRootPath: metaRootPath, - EtcdAddr: []string{etcdAddr}, - PulsarAddr: Params.PulsarAddress(), - ProxyIDs: []typeutil.UniqueID{1, 2}, - PulsarProxyChannels: []string{"proxy1", "proxy2"}, - PulsarProxySubName: "proxyTopics", - SoftTTBInterval: 300, - WriteIDs: []typeutil.UniqueID{3, 4}, - PulsarWriteChannels: []string{"write3", "write4"}, - PulsarWriteSubName: "writeTopics", - PulsarDMChannels: []string{"dm0", "dm1"}, - PulsarK2SChannels: []string{"k2s0", "k2s1"}, - DefaultRecordSize: 1024, - MinimumAssignSize: 1048576, - SegmentThreshold: 536870912, - SegmentExpireDuration: 2000, - NumOfChannel: 5, - NumOfQueryNode: 3, - StatsChannels: "statistic", - } - - svr, err := master.CreateServer(ctx, &opt) + svr, err := master.CreateServer(ctx) masterServer = svr if err != nil { log.Print("create server failed", zap.Error(err)) } - if err := svr.Run(int64(master.Params.Port())); err != nil { + if err := svr.Run(int64(master.Params.Port)); err != nil { log.Fatal("run server failed", zap.Error(err)) }