diff --git a/cmd/master/main.go b/cmd/master/main.go index 94862814e3..6ae67b0d51 100644 --- a/cmd/master/main.go +++ b/cmd/master/main.go @@ -21,6 +21,7 @@ func main() { etcdAddress, _ := masterParams.Params.EtcdAddress() etcdRootPath, _ := masterParams.Params.EtcdRootPath() pulsarAddr, _ := masterParams.Params.PulsarAddress() + pulsarAddr = "pulsar://" + pulsarAddr defaultRecordSize := masterParams.Params.DefaultRecordSize() minimumAssignSize := masterParams.Params.MinimumAssignSize() segmentThreshold := masterParams.Params.SegmentThreshold() @@ -34,15 +35,15 @@ func main() { MetaRootPath: etcdRootPath, EtcdAddr: []string{etcdAddress}, PulsarAddr: pulsarAddr, - ProxyIDs: nil, - PulsarProxyChannels: nil, - PulsarProxySubName: "", - SoftTTBInterval: 0, - WriteIDs: nil, - PulsarWriteChannels: nil, - PulsarWriteSubName: "", - PulsarDMChannels: nil, - PulsarK2SChannels: nil, + ProxyIDs: masterParams.Params.ProxyIDList(), + PulsarProxyChannels: masterParams.Params.ProxyTimeSyncChannels(), + PulsarProxySubName: masterParams.Params.ProxyTimeSyncSubName(), + SoftTTBInterval: masterParams.Params.SoftTimeTickBarrierInterval(), + WriteIDs: masterParams.Params.WriteIDList(), + PulsarWriteChannels: masterParams.Params.WriteTimeSyncChannels(), + PulsarWriteSubName: masterParams.Params.WriteTimeSyncSubName(), + PulsarDMChannels: masterParams.Params.DMTimeSyncChannels(), + PulsarK2SChannels: masterParams.Params.K2STimeSyncChannels(), DefaultRecordSize: defaultRecordSize, MinimumAssignSize: minimumAssignSize, SegmentThreshold: segmentThreshold, diff --git a/configs/advanced/flow_graph.yaml b/configs/advanced/flow_graph.yaml deleted file mode 100644 index d6590177df..0000000000 --- a/configs/advanced/flow_graph.yaml +++ /dev/null @@ -1,14 +0,0 @@ -# Copyright (C) 2019-2020 Zilliz. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed under the License -# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing permissions and limitations under the License. - -flowGraph: - maxQueueLength: 1024 - maxParallelism: 1024 diff --git a/configs/advanced/reader.yaml b/configs/advanced/reader.yaml index f8d1c15905..f4696dd3e2 100644 --- a/configs/advanced/reader.yaml +++ b/configs/advanced/reader.yaml @@ -9,15 +9,26 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express # or implied. See the License for the specific language governing permissions and limitations under the License. -service: - statsServiceTimeInterval: 1000 # milliseconds +reader: + stats: + publishInterval: 1000 # milliseconds -msgStream: - receiveBufSize: # msgPack chan buffer size - statsMsgStream: 64 - dmMsgStream: 1024 - searchMsgStream: 512 - searchResultMsgStream: 64 - pulsarBufSize: # pulsar chan buffer size - search: 512 - dm: 1024 + dataSync: + flowGraph: + maxQueueLength: 1024 + maxParallelism: 1024 + + msgStream: + dm: + recvBufSize: 1024 # msgPack chan buffer size + pulsarBufSize: 1024 # pulsar chan buffer size + + search: + recvBufSize: 512 + pulsarBufSize: 512 + + searchResult: + recvBufSize: 64 + + stats: + recvBufSize: 64 diff --git a/configs/config.yaml b/configs/config.yaml index 5b2a9881f3..87f4075fe0 100644 --- a/configs/config.yaml +++ b/configs/config.yaml @@ -14,11 +14,23 @@ master: port: 53100 pulsarmoniterinterval: 1 pulsartopic: "monitor-topic" + + proxyidlist: [1, 2] + proxyTimeSyncChannels: ["proxy1", "proxy2"] + proxyTimeSyncSubName: "proxy-topic" + softTimeTickBarrierInterval: 500 + + writeidlist: [3, 4] + writeTimeSyncChannels: ["write3", "write4"] + writeTimeSyncSubName: "write-topic" + + dmTimeSyncChannels: ["dm5", "dm6"] + k2sTimeSyncChannels: ["k2s7", "k2s8"] + defaultSizePerRecord: 1024 minimumAssignSize: 1048576 segmentThreshold: 536870912 segmentExpireDuration: 2000 - proxyidlist: [0] querynodenum: 1 writenodenum: 1 statsChannels: "statistic" diff --git a/go.mod b/go.mod index b5f9d7c194..cda9129e8e 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( github.com/sirupsen/logrus v1.6.0 // indirect github.com/smartystreets/goconvey v1.6.4 // indirect github.com/spaolacci/murmur3 v1.1.0 + github.com/spf13/cast v1.3.0 github.com/spf13/viper v1.7.1 github.com/stretchr/testify v1.6.1 github.com/tikv/client-go v0.0.0-20200824032810-95774393107b diff --git a/internal/core/src/query/CMakeLists.txt b/internal/core/src/query/CMakeLists.txt index 887f18e843..395bb345d4 100644 --- a/internal/core/src/query/CMakeLists.txt +++ b/internal/core/src/query/CMakeLists.txt @@ -6,6 +6,7 @@ set(MILVUS_QUERY_SRCS visitors/ShowPlanNodeVisitor.cpp visitors/ExecPlanNodeVisitor.cpp visitors/ShowExprVisitor.cpp + visitors/ExecExprVisitor.cpp Plan.cpp ) add_library(milvus_query ${MILVUS_QUERY_SRCS}) diff --git a/internal/core/src/query/Expr.h b/internal/core/src/query/Expr.h index 5851594539..d0d0fa7ec3 100644 --- a/internal/core/src/query/Expr.h +++ b/internal/core/src/query/Expr.h @@ -22,18 +22,10 @@ using ExprPtr = std::unique_ptr; struct BinaryExpr : Expr { ExprPtr left_; ExprPtr right_; - - public: - void - accept(ExprVisitor&) = 0; }; struct UnaryExpr : Expr { ExprPtr child_; - - public: - void - accept(ExprVisitor&) = 0; }; // TODO: not enabled in sprint 1 @@ -60,7 +52,7 @@ using FieldId = std::string; struct TermExpr : Expr { FieldId field_id_; - DataType data_type_; + DataType data_type_ = DataType::NONE; // std::vector terms_; protected: @@ -74,7 +66,7 @@ struct TermExpr : Expr { struct RangeExpr : Expr { FieldId field_id_; - DataType data_type_; + DataType data_type_ = DataType::NONE; enum class OpType { LessThan, LessEqual, GreaterThan, GreaterEqual, Equal, NotEqual }; static const std::map mapping_; // op_name -> op diff --git a/internal/core/src/query/Plan.cpp b/internal/core/src/query/Plan.cpp index 882e37e46a..d4bde83788 100644 --- a/internal/core/src/query/Plan.cpp +++ b/internal/core/src/query/Plan.cpp @@ -52,8 +52,10 @@ ParseRangeNodeImpl(const Schema& schema, const std::string& field_name, const Js expr->data_type_ = data_type; expr->field_id_ = field_name; for (auto& item : body.items()) { - auto& op_name = item.key(); - auto op = RangeExpr::mapping_.at(to_lower(op_name)); + auto op_name = to_lower(item.key()); + + AssertInfo(RangeExpr::mapping_.count(op_name), "op(" + op_name + ") not found"); + auto op = RangeExpr::mapping_.at(op_name); T value = item.value(); expr->conditions_.emplace_back(op, value); } @@ -130,7 +132,6 @@ CreatePlan(const Schema& schema, const std::string& dsl_str) { std::unique_ptr ParsePlaceholderGroup(const Plan* plan, const std::string& blob) { - (void)plan; namespace ser = milvus::proto::service; auto result = std::make_unique(); ser::PlaceholderGroup ph_group; @@ -139,9 +140,14 @@ ParsePlaceholderGroup(const Plan* plan, const std::string& blob) { for (auto& info : ph_group.placeholders()) { Placeholder element; element.tag_ = info.tag(); + Assert(plan->tag2field_.count(element.tag_)); + auto field_id = plan->tag2field_.at(element.tag_); + auto& field_meta = plan->schema_[field_id]; element.num_of_queries_ = info.values_size(); AssertInfo(element.num_of_queries_, "must have queries"); + Assert(element.num_of_queries_ > 0); element.line_sizeof_ = info.values().Get(0).size(); + Assert(field_meta.get_sizeof() == element.line_sizeof_); auto& target = element.blob_; target.reserve(element.line_sizeof_ * element.num_of_queries_); for (auto& line : info.values()) { diff --git a/internal/core/src/query/PlanImpl.h b/internal/core/src/query/PlanImpl.h index 81b6daf497..e1d738114a 100644 --- a/internal/core/src/query/PlanImpl.h +++ b/internal/core/src/query/PlanImpl.h @@ -14,7 +14,7 @@ using Json = nlohmann::json; // class definitions struct Plan { public: - Plan(const Schema& schema) : schema_(schema) { + explicit Plan(const Schema& schema) : schema_(schema) { } public: diff --git a/internal/core/src/query/PlanNode.h b/internal/core/src/query/PlanNode.h index 0cb1d06efc..f164e7ee85 100644 --- a/internal/core/src/query/PlanNode.h +++ b/internal/core/src/query/PlanNode.h @@ -38,10 +38,6 @@ struct VectorPlanNode : PlanNode { std::optional predicate_; QueryInfo query_info_; std::string placeholder_tag_; - - public: - virtual void - accept(PlanNodeVisitor&) = 0; }; struct FloatVectorANNS : VectorPlanNode { diff --git a/internal/core/src/query/generated/ExecExprVisitor.cpp b/internal/core/src/query/generated/ExecExprVisitor.cpp new file mode 100644 index 0000000000..47ef676a2a --- /dev/null +++ b/internal/core/src/query/generated/ExecExprVisitor.cpp @@ -0,0 +1,25 @@ +#error TODO: copy this file out, and modify the content. +#include "query/generated/ExecExprVisitor.h" + +namespace milvus::query { +void +ExecExprVisitor::visit(BoolUnaryExpr& expr) { + // TODO +} + +void +ExecExprVisitor::visit(BoolBinaryExpr& expr) { + // TODO +} + +void +ExecExprVisitor::visit(TermExpr& expr) { + // TODO +} + +void +ExecExprVisitor::visit(RangeExpr& expr) { + // TODO +} + +} // namespace milvus::query diff --git a/internal/core/src/query/generated/ExecExprVisitor.h b/internal/core/src/query/generated/ExecExprVisitor.h new file mode 100644 index 0000000000..4a38bbb06c --- /dev/null +++ b/internal/core/src/query/generated/ExecExprVisitor.h @@ -0,0 +1,41 @@ +#pragma once +// Generated File +// DO NOT EDIT +#include "segcore/SegmentNaive.h" +#include +#include "ExprVisitor.h" + +namespace milvus::query { +class ExecExprVisitor : ExprVisitor { + public: + void + visit(BoolUnaryExpr& expr) override; + + void + visit(BoolBinaryExpr& expr) override; + + void + visit(TermExpr& expr) override; + + void + visit(RangeExpr& expr) override; + + public: + using RetType = faiss::ConcurrentBitsetPtr; + explicit ExecExprVisitor(segcore::SegmentNaive& segment) : segment_(segment) { + } + RetType + call_child(Expr& expr) { + Assert(!ret_.has_value()); + expr.accept(*this); + Assert(ret_.has_value()); + auto ret = std::move(ret_); + ret_ = std::nullopt; + return std::move(ret.value()); + } + + private: + segcore::SegmentNaive& segment_; + std::optional ret_; +}; +} // namespace milvus::query diff --git a/internal/core/src/query/generated/ExecPlanNodeVisitor.h b/internal/core/src/query/generated/ExecPlanNodeVisitor.h index b1a1017960..58785ca0e7 100644 --- a/internal/core/src/query/generated/ExecPlanNodeVisitor.h +++ b/internal/core/src/query/generated/ExecPlanNodeVisitor.h @@ -9,10 +9,10 @@ namespace milvus::query { class ExecPlanNodeVisitor : PlanNodeVisitor { public: - virtual void + void visit(FloatVectorANNS& node) override; - virtual void + void visit(BinaryVectorANNS& node) override; public: diff --git a/internal/core/src/query/generated/ShowExprVisitor.h b/internal/core/src/query/generated/ShowExprVisitor.h index 53053480ae..b76844417e 100644 --- a/internal/core/src/query/generated/ShowExprVisitor.h +++ b/internal/core/src/query/generated/ShowExprVisitor.h @@ -9,16 +9,16 @@ namespace milvus::query { class ShowExprVisitor : ExprVisitor { public: - virtual void + void visit(BoolUnaryExpr& expr) override; - virtual void + void visit(BoolBinaryExpr& expr) override; - virtual void + void visit(TermExpr& expr) override; - virtual void + void visit(RangeExpr& expr) override; public: diff --git a/internal/core/src/query/generated/ShowPlanNodeVisitor.h b/internal/core/src/query/generated/ShowPlanNodeVisitor.h index 1835cb5547..03afe8ec85 100644 --- a/internal/core/src/query/generated/ShowPlanNodeVisitor.h +++ b/internal/core/src/query/generated/ShowPlanNodeVisitor.h @@ -10,10 +10,10 @@ namespace milvus::query { class ShowPlanNodeVisitor : PlanNodeVisitor { public: - virtual void + void visit(FloatVectorANNS& node) override; - virtual void + void visit(BinaryVectorANNS& node) override; public: diff --git a/internal/core/src/query/visitors/ExecExprVisitor.cpp b/internal/core/src/query/visitors/ExecExprVisitor.cpp new file mode 100644 index 0000000000..055762511b --- /dev/null +++ b/internal/core/src/query/visitors/ExecExprVisitor.cpp @@ -0,0 +1,51 @@ +#include "segcore/SegmentNaive.h" +#include +#include "query/generated/ExecExprVisitor.h" + +namespace milvus::query { +#if 1 +// THIS CONTAINS EXTRA BODY FOR VISITOR +// WILL BE USED BY GENERATOR +namespace impl { +class ExecExprVisitor : ExprVisitor { + public: + using RetType = faiss::ConcurrentBitsetPtr; + explicit ExecExprVisitor(segcore::SegmentNaive& segment) : segment_(segment) { + } + RetType + call_child(Expr& expr) { + Assert(!ret_.has_value()); + expr.accept(*this); + Assert(ret_.has_value()); + auto ret = std::move(ret_); + ret_ = std::nullopt; + return std::move(ret.value()); + } + + private: + segcore::SegmentNaive& segment_; + std::optional ret_; +}; +} // namespace impl +#endif + +void +ExecExprVisitor::visit(BoolUnaryExpr& expr) { + PanicInfo("unimplemented"); +} + +void +ExecExprVisitor::visit(BoolBinaryExpr& expr) { + PanicInfo("unimplemented"); +} + +void +ExecExprVisitor::visit(TermExpr& expr) { + PanicInfo("unimplemented"); +} + +void +ExecExprVisitor::visit(RangeExpr& expr) { +} + +} // namespace milvus::query diff --git a/internal/core/src/segcore/CMakeLists.txt b/internal/core/src/segcore/CMakeLists.txt index a339390105..82d628cd21 100644 --- a/internal/core/src/segcore/CMakeLists.txt +++ b/internal/core/src/segcore/CMakeLists.txt @@ -3,7 +3,6 @@ set(SEGCORE_FILES SegmentNaive.cpp SegmentSmallIndex.cpp IndexMeta.cpp - ConcurrentVector.cpp Collection.cpp collection_c.cpp segment_c.cpp diff --git a/internal/core/src/segcore/Collection.cpp b/internal/core/src/segcore/Collection.cpp index fb3a1df0ea..c7084df73d 100644 --- a/internal/core/src/segcore/Collection.cpp +++ b/internal/core/src/segcore/Collection.cpp @@ -4,7 +4,6 @@ #include "pb/etcd_meta.pb.h" #include #include -#include namespace milvus::segcore { @@ -134,7 +133,7 @@ Collection::parse() { auto schema = std::make_shared(); for (const milvus::proto::schema::FieldSchema& child : collection_meta.schema().fields()) { const auto& type_params = child.type_params(); - int dim = 16; + int64_t dim = 16; for (const auto& type_param : type_params) { if (type_param.key() == "dim") { dim = strtoll(type_param.value().c_str(), nullptr, 10); diff --git a/internal/core/src/segcore/ConcurrentVector.cpp b/internal/core/src/segcore/ConcurrentVector.cpp deleted file mode 100644 index 6be1cb74f0..0000000000 --- a/internal/core/src/segcore/ConcurrentVector.cpp +++ /dev/null @@ -1,5 +0,0 @@ - -#include -#include "segcore/ConcurrentVector.h" - -namespace milvus::segcore {} diff --git a/internal/core/src/segcore/IndexingEntry.cpp b/internal/core/src/segcore/IndexingEntry.cpp index b06fc69716..44bc2a43d2 100644 --- a/internal/core/src/segcore/IndexingEntry.cpp +++ b/internal/core/src/segcore/IndexingEntry.cpp @@ -11,7 +11,8 @@ IndexingEntry::BuildIndexRange(int64_t ack_beg, int64_t ack_end, const VectorBas assert(field_meta_.get_data_type() == DataType::VECTOR_FLOAT); auto dim = field_meta_.get_dim(); - auto source = static_cast*>(vec_base); + auto source = dynamic_cast*>(vec_base); + Assert(source); auto chunk_size = source->chunk_size(); assert(ack_end <= chunk_size); auto conf = get_build_conf(); diff --git a/internal/core/src/segcore/SegmentNaive.cpp b/internal/core/src/segcore/SegmentNaive.cpp index 6e79976f00..917b4b524a 100644 --- a/internal/core/src/segcore/SegmentNaive.cpp +++ b/internal/core/src/segcore/SegmentNaive.cpp @@ -451,10 +451,10 @@ SegmentNaive::QueryDeprecated(query::QueryDeprecatedPtr query_info, Timestamp ti Status SegmentNaive::Close() { if (this->record_.reserved != this->record_.ack_responder_.GetAck()) { - std::runtime_error("insert not ready"); + PanicInfo("insert not ready"); } - if (this->deleted_record_.reserved != this->record_.ack_responder_.GetAck()) { - std::runtime_error("delete not ready"); + if (this->deleted_record_.reserved != this->deleted_record_.ack_responder_.GetAck()) { + PanicInfo("delete not ready"); } state_ = SegmentState::Closed; return Status::OK(); diff --git a/internal/core/src/segcore/SegmentNaive.h b/internal/core/src/segcore/SegmentNaive.h index 07bdf9e451..a4e35afb56 100644 --- a/internal/core/src/segcore/SegmentNaive.h +++ b/internal/core/src/segcore/SegmentNaive.h @@ -20,8 +20,6 @@ namespace milvus::segcore { class SegmentNaive : public SegmentBase { public: - virtual ~SegmentNaive() = default; - // SegmentBase(std::shared_ptr collection); int64_t @@ -47,7 +45,7 @@ class SegmentNaive : public SegmentBase { Status QueryDeprecated(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& results) override; - virtual Status + Status Search(const query::Plan* Plan, const query::PlaceholderGroup* placeholder_groups[], const Timestamp timestamps[], @@ -106,7 +104,7 @@ class SegmentNaive : public SegmentBase { friend std::unique_ptr CreateSegment(SchemaPtr schema); - explicit SegmentNaive(SchemaPtr schema) : schema_(schema), record_(*schema) { + explicit SegmentNaive(const SchemaPtr& schema) : schema_(schema), record_(*schema) { } private: diff --git a/internal/core/src/segcore/SegmentSmallIndex.cpp b/internal/core/src/segcore/SegmentSmallIndex.cpp index 272ecb157e..140f55d50f 100644 --- a/internal/core/src/segcore/SegmentSmallIndex.cpp +++ b/internal/core/src/segcore/SegmentSmallIndex.cpp @@ -50,7 +50,7 @@ SegmentSmallIndex::get_deleted_bitmap(int64_t del_barrier, for (auto del_index = del_barrier; del_index < old->del_barrier; ++del_index) { // get uid in delete logs auto uid = deleted_record_.uids_[del_index]; - // map uid to corrensponding offsets, select the max one, which should be the target + // map uid to corresponding offsets, select the max one, which should be the target // the max one should be closest to query_timestamp, so the delete log should refer to it int64_t the_offset = -1; auto [iter_b, iter_e] = uid2offset_.equal_range(uid); @@ -73,7 +73,7 @@ SegmentSmallIndex::get_deleted_bitmap(int64_t del_barrier, for (auto del_index = old->del_barrier; del_index < del_barrier; ++del_index) { // get uid in delete logs auto uid = deleted_record_.uids_[del_index]; - // map uid to corrensponding offsets, select the max one, which should be the target + // map uid to corresponding offsets, select the max one, which should be the target // the max one should be closest to query_timestamp, so the delete log should refer to it int64_t the_offset = -1; auto [iter_b, iter_e] = uid2offset_.equal_range(uid); @@ -228,7 +228,7 @@ SegmentSmallIndex::QueryBruteForceImpl(const query::QueryInfo& info, QueryResult& results) { // step 1: binary search to find the barrier of the snapshot auto ins_barrier = get_barrier(record_, timestamp); - auto del_barrier = get_barrier(deleted_record_, timestamp); + // auto del_barrier = get_barrier(deleted_record_, timestamp); #if 0 auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier); Assert(bitmap_holder); @@ -321,7 +321,6 @@ SegmentSmallIndex::QueryDeprecated(query::QueryDeprecatedPtr query_info, Timesta x = dis(e); } } - int64_t inferred_dim = query_info->query_raw_data.size() / query_info->num_queries; // TODO query::QueryInfo info{ query_info->topK, @@ -338,10 +337,10 @@ SegmentSmallIndex::QueryDeprecated(query::QueryDeprecatedPtr query_info, Timesta Status SegmentSmallIndex::Close() { if (this->record_.reserved != this->record_.ack_responder_.GetAck()) { - std::runtime_error("insert not ready"); + PanicInfo("insert not ready"); } - if (this->deleted_record_.reserved != this->record_.ack_responder_.GetAck()) { - std::runtime_error("delete not ready"); + if (this->deleted_record_.reserved != this->deleted_record_.ack_responder_.GetAck()) { + PanicInfo("delete not ready"); } state_ = SegmentState::Closed; return Status::OK(); @@ -357,8 +356,6 @@ SegmentSmallIndex::BuildVecIndexImpl(const IndexMeta::Entry& entry) { auto dim = field.get_dim(); auto indexing = knowhere::VecIndexFactory::GetInstance().CreateVecIndex(entry.type, entry.mode); - auto chunk_size = record_.uids_.chunk_size(); - auto& uids = record_.uids_; auto entities = record_.get_vec_entity(offset); @@ -398,7 +395,7 @@ SegmentSmallIndex::BuildIndex(IndexMetaPtr remote_index_meta) { auto index_meta = std::make_shared(schema_); // TODO: this is merge of query conf and insert conf - // TODO: should be splitted into multiple configs + // TODO: should be split into multiple configs auto conf = milvus::knowhere::Config{ {milvus::knowhere::meta::DIM, dim}, {milvus::knowhere::IndexParams::nlist, 100}, {milvus::knowhere::IndexParams::nprobe, 4}, {milvus::knowhere::IndexParams::m, 4}, @@ -431,8 +428,16 @@ SegmentSmallIndex::BuildIndex(IndexMetaPtr remote_index_meta) { } index_ready_ = true; -#endif return Status::OK(); +#endif +} + +static uint64_t +upper_align(int64_t value, int64_t align) { + Assert(align > 0); + Assert((align & (align - 1)) == 0); + auto groups = (value + align - 1) / align; + return groups * align; } int64_t @@ -448,9 +453,9 @@ SegmentSmallIndex::GetMemoryUsageInBytes() { } } #endif - int64_t ins_n = (record_.reserved + DefaultElementPerChunk - 1) & ~(DefaultElementPerChunk - 1); + int64_t ins_n = upper_align(record_.reserved, DefaultElementPerChunk); total_bytes += ins_n * (schema_->get_total_sizeof() + 16 + 1); - int64_t del_n = (deleted_record_.reserved + DefaultElementPerChunk - 1) & ~(DefaultElementPerChunk - 1); + int64_t del_n = upper_align(deleted_record_.reserved, DefaultElementPerChunk); total_bytes += del_n * (16 * 2); return total_bytes; } diff --git a/internal/core/src/segcore/SegmentSmallIndex.h b/internal/core/src/segcore/SegmentSmallIndex.h index 0a4282c08c..99d1a25d1d 100644 --- a/internal/core/src/segcore/SegmentSmallIndex.h +++ b/internal/core/src/segcore/SegmentSmallIndex.h @@ -46,10 +46,6 @@ namespace milvus::segcore { class SegmentSmallIndex : public SegmentBase { public: - virtual ~SegmentSmallIndex() = default; - - // SegmentBase(std::shared_ptr collection); - int64_t PreInsert(int64_t size) override; @@ -111,6 +107,9 @@ class SegmentSmallIndex : public SegmentBase { GetMemoryUsageInBytes() override; public: + void + get_insert_record(); + ssize_t get_row_count() const override { return record_.ack_responder_.GetAck(); @@ -130,7 +129,8 @@ class SegmentSmallIndex : public SegmentBase { friend std::unique_ptr CreateSegment(SchemaPtr schema); - explicit SegmentSmallIndex(SchemaPtr schema) : schema_(schema), record_(*schema_), indexing_record_(*schema_) { + explicit SegmentSmallIndex(SchemaPtr schema) + : schema_(std::move(schema)), record_(*schema_), indexing_record_(*schema_) { } public: diff --git a/internal/core/unittest/test_c_api.cpp b/internal/core/unittest/test_c_api.cpp index de59eef953..62dff9c310 100644 --- a/internal/core/unittest/test_c_api.cpp +++ b/internal/core/unittest/test_c_api.cpp @@ -141,7 +141,7 @@ TEST(CApiTest, SearchTest) { auto blob = raw_group.SerializeAsString(); auto plan = CreatePlan(collection, dsl_string); - auto placeholderGroup = ParsePlaceholderGroup(nullptr, blob.data(), blob.length()); + auto placeholderGroup = ParsePlaceholderGroup(plan, blob.data(), blob.length()); std::vector placeholderGroups; placeholderGroups.push_back(placeholderGroup); timestamps.clear(); @@ -228,7 +228,7 @@ TEST(CApiTest, BuildIndexTest) { auto blob = raw_group.SerializeAsString(); auto plan = CreatePlan(collection, dsl_string); - auto placeholderGroup = ParsePlaceholderGroup(nullptr, blob.data(), blob.length()); + auto placeholderGroup = ParsePlaceholderGroup(plan, blob.data(), blob.length()); std::vector placeholderGroups; placeholderGroups.push_back(placeholderGroup); timestamps.clear(); diff --git a/internal/core/unittest/test_query.cpp b/internal/core/unittest/test_query.cpp index b29fe10dfa..d3d97def8c 100644 --- a/internal/core/unittest/test_query.cpp +++ b/internal/core/unittest/test_query.cpp @@ -133,7 +133,7 @@ TEST(Query, ParsePlaceholderGroup) { { "bool": { "vector": { - "Vec": { + "fakevec": { "metric_type": "L2", "params": { "nprobe": 10 diff --git a/internal/master/paramtable/paramtable.go b/internal/master/paramtable/paramtable.go index f7aaa2539e..1d48a8fb4b 100644 --- a/internal/master/paramtable/paramtable.go +++ b/internal/master/paramtable/paramtable.go @@ -1,9 +1,13 @@ package paramtable import ( + "log" "strconv" + "strings" "github.com/zilliztech/milvus-distributed/internal/util/paramtable" + "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) type ParamTable struct { @@ -80,3 +84,97 @@ func (p *ParamTable) StatsChannels() string { channels, _ := p.Load("master.statsChannels") return channels } + +func (p *ParamTable) ProxyIDList() []typeutil.UniqueID { + id, err := p.Load("master.proxyidlist") + if err != nil { + log.Panicf("load proxy id list error, %s", err.Error()) + } + ids := strings.Split(id, ",") + idlist := make([]typeutil.UniqueID, 0, len(ids)) + for _, i := range ids { + v, err := strconv.ParseInt(i, 10, 64) + if err != nil { + log.Panicf("load proxy id list error, %s", err.Error()) + } + idlist = append(idlist, typeutil.UniqueID(v)) + } + return idlist +} + +func (p *ParamTable) ProxyTimeSyncChannels() []string { + chs, err := p.Load("master.proxyTimeSyncChannels") + if err != nil { + log.Panic(err) + } + return strings.Split(chs, ",") +} + +func (p *ParamTable) ProxyTimeSyncSubName() string { + name, err := p.Load("master.proxyTimeSyncSubName") + if err != nil { + log.Panic(err) + } + return name +} + +func (p *ParamTable) SoftTimeTickBarrierInterval() typeutil.Timestamp { + t, err := p.Load("master.softTimeTickBarrierInterval") + if err != nil { + log.Panic(err) + } + v, err := strconv.ParseInt(t, 10, 64) + if err != nil { + log.Panic(err) + } + return tsoutil.ComposeTS(v, 0) +} + +func (p *ParamTable) WriteIDList() []typeutil.UniqueID { + id, err := p.Load("master.writeidlist") + if err != nil { + log.Panic(err) + } + ids := strings.Split(id, ",") + idlist := make([]typeutil.UniqueID, 0, len(ids)) + for _, i := range ids { + v, err := strconv.ParseInt(i, 10, 64) + if err != nil { + log.Panicf("load proxy id list error, %s", err.Error()) + } + idlist = append(idlist, typeutil.UniqueID(v)) + } + return idlist +} + +func (p *ParamTable) WriteTimeSyncChannels() []string { + chs, err := p.Load("master.writeTimeSyncChannels") + if err != nil { + log.Fatal(err) + } + return strings.Split(chs, ",") +} + +func (p *ParamTable) WriteTimeSyncSubName() string { + name, err := p.Load("master.writeTimeSyncSubName") + if err != nil { + log.Fatal(err) + } + return name +} + +func (p *ParamTable) DMTimeSyncChannels() []string { + chs, err := p.Load("master.dmTimeSyncChannels") + if err != nil { + log.Fatal(err) + } + return strings.Split(chs, ",") +} + +func (p *ParamTable) K2STimeSyncChannels() []string { + chs, err := p.Load("master.k2sTimeSyncChannels") + if err != nil { + log.Fatal(err) + } + return strings.Split(chs, ",") +} diff --git a/internal/reader/data_sync_service.go b/internal/reader/data_sync_service.go index c67f4e58f3..ffc24e382c 100644 --- a/internal/reader/data_sync_service.go +++ b/internal/reader/data_sync_service.go @@ -37,7 +37,6 @@ func (dsService *dataSyncService) initNodes() { // TODO: add delete pipeline support dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx) - flowgraph.Params.Init() var dmStreamNode Node = newDmInputNode(dsService.ctx) var filterDmNode Node = newFilteredDmNode() diff --git a/internal/reader/flow_graph_delete_node.go b/internal/reader/flow_graph_delete_node.go index b0a81e9d80..0a45357d6e 100644 --- a/internal/reader/flow_graph_delete_node.go +++ b/internal/reader/flow_graph_delete_node.go @@ -1,7 +1,5 @@ package reader -import "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" - type deleteNode struct { BaseNode deleteMsg deleteMsg @@ -16,8 +14,8 @@ func (dNode *deleteNode) Operate(in []*Msg) []*Msg { } func newDeleteNode() *deleteNode { - maxQueueLength := flowgraph.Params.FlowGraphMaxQueueLength() - maxParallelism := flowgraph.Params.FlowGraphMaxParallelism() + maxQueueLength := Params.flowGraphMaxQueueLength() + maxParallelism := Params.flowGraphMaxParallelism() baseNode := BaseNode{} baseNode.SetMaxQueueLength(maxQueueLength) diff --git a/internal/reader/flow_graph_filter_dm_node.go b/internal/reader/flow_graph_filter_dm_node.go index f32dd087ad..ddff7e5868 100644 --- a/internal/reader/flow_graph_filter_dm_node.go +++ b/internal/reader/flow_graph_filter_dm_node.go @@ -5,7 +5,6 @@ import ( "github.com/zilliztech/milvus-distributed/internal/msgstream" internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" - "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" ) type filterDmNode struct { @@ -55,8 +54,8 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg { } func newFilteredDmNode() *filterDmNode { - maxQueueLength := flowgraph.Params.FlowGraphMaxQueueLength() - maxParallelism := flowgraph.Params.FlowGraphMaxParallelism() + maxQueueLength := Params.flowGraphMaxQueueLength() + maxParallelism := Params.flowGraphMaxParallelism() baseNode := BaseNode{} baseNode.SetMaxQueueLength(maxQueueLength) diff --git a/internal/reader/flow_graph_insert_node.go b/internal/reader/flow_graph_insert_node.go index 2ec7c2dc54..2d56e0bd57 100644 --- a/internal/reader/flow_graph_insert_node.go +++ b/internal/reader/flow_graph_insert_node.go @@ -6,7 +6,6 @@ import ( "sync" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" ) type insertNode struct { @@ -127,8 +126,8 @@ func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *syn } func newInsertNode(replica *collectionReplica) *insertNode { - maxQueueLength := flowgraph.Params.FlowGraphMaxQueueLength() - maxParallelism := flowgraph.Params.FlowGraphMaxParallelism() + maxQueueLength := Params.flowGraphMaxQueueLength() + maxParallelism := Params.flowGraphMaxParallelism() baseNode := BaseNode{} baseNode.SetMaxQueueLength(maxQueueLength) diff --git a/internal/reader/flow_graph_key2seg_node.go b/internal/reader/flow_graph_key2seg_node.go index 399533f88d..112ff9876b 100644 --- a/internal/reader/flow_graph_key2seg_node.go +++ b/internal/reader/flow_graph_key2seg_node.go @@ -1,7 +1,5 @@ package reader -import "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" - type key2SegNode struct { BaseNode key2SegMsg key2SegMsg @@ -16,8 +14,8 @@ func (ksNode *key2SegNode) Operate(in []*Msg) []*Msg { } func newKey2SegNode() *key2SegNode { - maxQueueLength := flowgraph.Params.FlowGraphMaxQueueLength() - maxParallelism := flowgraph.Params.FlowGraphMaxParallelism() + maxQueueLength := Params.flowGraphMaxQueueLength() + maxParallelism := Params.flowGraphMaxParallelism() baseNode := BaseNode{} baseNode.SetMaxQueueLength(maxQueueLength) diff --git a/internal/reader/flow_graph_msg_stream_input_nodes.go b/internal/reader/flow_graph_msg_stream_input_nodes.go index fd580956da..2ebbd6cd6d 100644 --- a/internal/reader/flow_graph_msg_stream_input_nodes.go +++ b/internal/reader/flow_graph_msg_stream_input_nodes.go @@ -9,10 +9,10 @@ import ( ) func newDmInputNode(ctx context.Context) *flowgraph.InputNode { - receiveBufSize := Params.dmMsgStreamReceiveBufSize() + receiveBufSize := Params.dmReceiveBufSize() pulsarBufSize := Params.dmPulsarBufSize() - msgStreamURL, err := Params.PulsarAddress() + msgStreamURL, err := Params.pulsarAddress() if err != nil { log.Fatal(err) } @@ -27,6 +27,9 @@ func newDmInputNode(ctx context.Context) *flowgraph.InputNode { var stream msgstream.MsgStream = insertStream - node := flowgraph.NewInputNode(&stream, "dmInputNode") + maxQueueLength := Params.flowGraphMaxQueueLength() + maxParallelism := Params.flowGraphMaxParallelism() + + node := flowgraph.NewInputNode(&stream, "dmInputNode", maxQueueLength, maxParallelism) return node } diff --git a/internal/reader/flow_graph_schema_update_node.go b/internal/reader/flow_graph_schema_update_node.go index 3262b0b1f6..160f31226e 100644 --- a/internal/reader/flow_graph_schema_update_node.go +++ b/internal/reader/flow_graph_schema_update_node.go @@ -1,7 +1,5 @@ package reader -import "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" - type schemaUpdateNode struct { BaseNode schemaUpdateMsg schemaUpdateMsg @@ -16,8 +14,8 @@ func (suNode *schemaUpdateNode) Operate(in []*Msg) []*Msg { } func newSchemaUpdateNode() *schemaUpdateNode { - maxQueueLength := flowgraph.Params.FlowGraphMaxQueueLength() - maxParallelism := flowgraph.Params.FlowGraphMaxParallelism() + maxQueueLength := Params.flowGraphMaxQueueLength() + maxParallelism := Params.flowGraphMaxParallelism() baseNode := BaseNode{} baseNode.SetMaxQueueLength(maxQueueLength) diff --git a/internal/reader/flow_graph_service_time_node.go b/internal/reader/flow_graph_service_time_node.go index 8271df117d..b314097b19 100644 --- a/internal/reader/flow_graph_service_time_node.go +++ b/internal/reader/flow_graph_service_time_node.go @@ -2,8 +2,6 @@ package reader import ( "log" - - "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" ) type serviceTimeNode struct { @@ -35,8 +33,8 @@ func (stNode *serviceTimeNode) Operate(in []*Msg) []*Msg { } func newServiceTimeNode(replica *collectionReplica) *serviceTimeNode { - maxQueueLength := flowgraph.Params.FlowGraphMaxQueueLength() - maxParallelism := flowgraph.Params.FlowGraphMaxParallelism() + maxQueueLength := Params.flowGraphMaxQueueLength() + maxParallelism := Params.flowGraphMaxParallelism() baseNode := BaseNode{} baseNode.SetMaxQueueLength(maxQueueLength) diff --git a/internal/reader/meta_service.go b/internal/reader/meta_service.go index 2b98161b2a..55167922d4 100644 --- a/internal/reader/meta_service.go +++ b/internal/reader/meta_service.go @@ -126,8 +126,8 @@ func isSegmentChannelRangeInQueryNodeChannelRange(segment *etcdpb.SegmentMeta) b } Params.Init() - var queryNodeChannelStart = Params.TopicStart() - var queryNodeChannelEnd = Params.TopicEnd() + var queryNodeChannelStart = Params.topicStart() + var queryNodeChannelEnd = Params.topicEnd() if segment.ChannelStart >= int32(queryNodeChannelStart) && segment.ChannelEnd <= int32(queryNodeChannelEnd) { return true diff --git a/internal/reader/param_table.go b/internal/reader/param_table.go index 9e3a048fb1..0a12af03d0 100644 --- a/internal/reader/param_table.go +++ b/internal/reader/param_table.go @@ -20,7 +20,7 @@ func (p *ParamTable) Init() { } } -func (p *ParamTable) PulsarAddress() (string, error) { +func (p *ParamTable) pulsarAddress() (string, error) { url, err := p.Load("_PulsarAddress") if err != nil { panic(err) @@ -28,7 +28,7 @@ func (p *ParamTable) PulsarAddress() (string, error) { return "pulsar://" + url, nil } -func (p *ParamTable) QueryNodeID() int { +func (p *ParamTable) queryNodeID() int { queryNodeID, err := p.Load("reader.clientid") if err != nil { panic(err) @@ -40,7 +40,7 @@ func (p *ParamTable) QueryNodeID() int { return id } -func (p *ParamTable) TopicStart() int { +func (p *ParamTable) topicStart() int { topicStart, err := p.Load("reader.topicstart") if err != nil { panic(err) @@ -52,7 +52,7 @@ func (p *ParamTable) TopicStart() int { return topicStartNum } -func (p *ParamTable) TopicEnd() int { +func (p *ParamTable) topicEnd() int { topicEnd, err := p.Load("reader.topicend") if err != nil { panic(err) @@ -64,9 +64,10 @@ func (p *ParamTable) TopicEnd() int { return topicEndNum } -// private advanced params -func (p *ParamTable) statsServiceTimeInterval() int { - timeInterval, err := p.Load("service.statsServiceTimeInterval") +// advanced params +// stats +func (p *ParamTable) statsPublishInterval() int { + timeInterval, err := p.Load("reader.stats.publishInterval") if err != nil { panic(err) } @@ -77,8 +78,34 @@ func (p *ParamTable) statsServiceTimeInterval() int { return interval } -func (p *ParamTable) statsMsgStreamReceiveBufSize() int64 { - revBufSize, err := p.Load("msgStream.receiveBufSize.statsMsgStream") +// dataSync: +func (p *ParamTable) flowGraphMaxQueueLength() int32 { + queueLength, err := p.Load("reader.dataSync.flowGraph.maxQueueLength") + if err != nil { + panic(err) + } + length, err := strconv.Atoi(queueLength) + if err != nil { + panic(err) + } + return int32(length) +} + +func (p *ParamTable) flowGraphMaxParallelism() int32 { + maxParallelism, err := p.Load("reader.dataSync.flowGraph.maxParallelism") + if err != nil { + panic(err) + } + maxPara, err := strconv.Atoi(maxParallelism) + if err != nil { + panic(err) + } + return int32(maxPara) +} + +// msgStream +func (p *ParamTable) dmReceiveBufSize() int64 { + revBufSize, err := p.Load("reader.msgStream.dm.recvBufSize") if err != nil { panic(err) } @@ -89,32 +116,20 @@ func (p *ParamTable) statsMsgStreamReceiveBufSize() int64 { return int64(bufSize) } -func (p *ParamTable) dmMsgStreamReceiveBufSize() int64 { - revBufSize, err := p.Load("msgStream.receiveBufSize.dmMsgStream") +func (p *ParamTable) dmPulsarBufSize() int64 { + pulsarBufSize, err := p.Load("reader.msgStream.dm.pulsarBufSize") if err != nil { panic(err) } - bufSize, err := strconv.Atoi(revBufSize) + bufSize, err := strconv.Atoi(pulsarBufSize) if err != nil { panic(err) } return int64(bufSize) } -func (p *ParamTable) searchMsgStreamReceiveBufSize() int64 { - revBufSize, err := p.Load("msgStream.receiveBufSize.searchMsgStream") - if err != nil { - panic(err) - } - bufSize, err := strconv.Atoi(revBufSize) - if err != nil { - panic(err) - } - return int64(bufSize) -} - -func (p *ParamTable) searchResultMsgStreamReceiveBufSize() int64 { - revBufSize, err := p.Load("msgStream.receiveBufSize.searchResultMsgStream") +func (p *ParamTable) searchReceiveBufSize() int64 { + revBufSize, err := p.Load("reader.msgStream.search.recvBufSize") if err != nil { panic(err) } @@ -126,7 +141,7 @@ func (p *ParamTable) searchResultMsgStreamReceiveBufSize() int64 { } func (p *ParamTable) searchPulsarBufSize() int64 { - pulsarBufSize, err := p.Load("msgStream.pulsarBufSize.search") + pulsarBufSize, err := p.Load("reader.msgStream.search.pulsarBufSize") if err != nil { panic(err) } @@ -137,12 +152,24 @@ func (p *ParamTable) searchPulsarBufSize() int64 { return int64(bufSize) } -func (p *ParamTable) dmPulsarBufSize() int64 { - pulsarBufSize, err := p.Load("msgStream.pulsarBufSize.dm") +func (p *ParamTable) searchResultReceiveBufSize() int64 { + revBufSize, err := p.Load("reader.msgStream.searchResult.recvBufSize") if err != nil { panic(err) } - bufSize, err := strconv.Atoi(pulsarBufSize) + bufSize, err := strconv.Atoi(revBufSize) + if err != nil { + panic(err) + } + return int64(bufSize) +} + +func (p *ParamTable) statsReceiveBufSize() int64 { + revBufSize, err := p.Load("reader.msgStream.stats.recvBufSize") + if err != nil { + panic(err) + } + bufSize, err := strconv.Atoi(revBufSize) if err != nil { panic(err) } diff --git a/internal/reader/param_table_test.go b/internal/reader/param_table_test.go index 7580aadd10..f52220f7d5 100644 --- a/internal/reader/param_table_test.go +++ b/internal/reader/param_table_test.go @@ -12,56 +12,56 @@ func TestParamTable_Init(t *testing.T) { func TestParamTable_PulsarAddress(t *testing.T) { Params.Init() - address, err := Params.PulsarAddress() + address, err := Params.pulsarAddress() assert.NoError(t, err) assert.Equal(t, address, "pulsar://localhost:6650") } func TestParamTable_QueryNodeID(t *testing.T) { Params.Init() - id := Params.QueryNodeID() + id := Params.queryNodeID() assert.Equal(t, id, 0) } func TestParamTable_TopicStart(t *testing.T) { Params.Init() - topicStart := Params.TopicStart() + topicStart := Params.topicStart() assert.Equal(t, topicStart, 0) } func TestParamTable_TopicEnd(t *testing.T) { Params.Init() - topicEnd := Params.TopicEnd() + topicEnd := Params.topicEnd() assert.Equal(t, topicEnd, 128) } func TestParamTable_statsServiceTimeInterval(t *testing.T) { Params.Init() - interval := Params.statsServiceTimeInterval() + interval := Params.statsPublishInterval() assert.Equal(t, interval, 1000) } func TestParamTable_statsMsgStreamReceiveBufSize(t *testing.T) { Params.Init() - bufSize := Params.statsMsgStreamReceiveBufSize() + bufSize := Params.statsReceiveBufSize() assert.Equal(t, bufSize, int64(64)) } func TestParamTable_dmMsgStreamReceiveBufSize(t *testing.T) { Params.Init() - bufSize := Params.dmMsgStreamReceiveBufSize() + bufSize := Params.dmReceiveBufSize() assert.Equal(t, bufSize, int64(1024)) } func TestParamTable_searchMsgStreamReceiveBufSize(t *testing.T) { Params.Init() - bufSize := Params.searchMsgStreamReceiveBufSize() + bufSize := Params.searchReceiveBufSize() assert.Equal(t, bufSize, int64(512)) } func TestParamTable_searchResultMsgStreamReceiveBufSize(t *testing.T) { Params.Init() - bufSize := Params.searchResultMsgStreamReceiveBufSize() + bufSize := Params.searchResultReceiveBufSize() assert.Equal(t, bufSize, int64(64)) } @@ -76,3 +76,15 @@ func TestParamTable_dmPulsarBufSize(t *testing.T) { bufSize := Params.dmPulsarBufSize() assert.Equal(t, bufSize, int64(1024)) } + +func TestParamTable_flowGraphMaxQueueLength(t *testing.T) { + Params.Init() + length := Params.flowGraphMaxQueueLength() + assert.Equal(t, length, int32(1024)) +} + +func TestParamTable_flowGraphMaxParallelism(t *testing.T) { + Params.Init() + maxParallelism := Params.flowGraphMaxParallelism() + assert.Equal(t, maxParallelism, int32(1024)) +} diff --git a/internal/reader/search_service.go b/internal/reader/search_service.go index bad9b00d46..e66ed8684a 100644 --- a/internal/reader/search_service.go +++ b/internal/reader/search_service.go @@ -35,10 +35,10 @@ type SearchResult struct { } func newSearchService(ctx context.Context, replica *collectionReplica) *searchService { - receiveBufSize := Params.searchMsgStreamReceiveBufSize() + receiveBufSize := Params.searchReceiveBufSize() pulsarBufSize := Params.searchPulsarBufSize() - msgStreamURL, err := Params.PulsarAddress() + msgStreamURL, err := Params.pulsarAddress() if err != nil { log.Fatal(err) } diff --git a/internal/reader/stats_service.go b/internal/reader/stats_service.go index 35c81a9dfe..859c4786b7 100644 --- a/internal/reader/stats_service.go +++ b/internal/reader/stats_service.go @@ -28,11 +28,11 @@ func newStatsService(ctx context.Context, replica *collectionReplica) *statsServ } func (sService *statsService) start() { - sleepTimeInterval := Params.statsServiceTimeInterval() - receiveBufSize := Params.statsMsgStreamReceiveBufSize() + sleepTimeInterval := Params.statsPublishInterval() + receiveBufSize := Params.statsReceiveBufSize() // start pulsar - msgStreamURL, err := Params.PulsarAddress() + msgStreamURL, err := Params.pulsarAddress() if err != nil { log.Fatal(err) } diff --git a/internal/util/flowgraph/input_node.go b/internal/util/flowgraph/input_node.go index 7251f59044..7c4271b23b 100644 --- a/internal/util/flowgraph/input_node.go +++ b/internal/util/flowgraph/input_node.go @@ -45,10 +45,7 @@ func (inNode *InputNode) Operate(in []*Msg) []*Msg { return []*Msg{&msgStreamMsg} } -func NewInputNode(inStream *msgstream.MsgStream, nodeName string) *InputNode { - maxQueueLength := Params.FlowGraphMaxQueueLength() - maxParallelism := Params.FlowGraphMaxParallelism() - +func NewInputNode(inStream *msgstream.MsgStream, nodeName string, maxQueueLength int32, maxParallelism int32) *InputNode { baseNode := BaseNode{} baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxParallelism(maxParallelism) diff --git a/internal/util/flowgraph/param_table.go b/internal/util/flowgraph/param_table.go deleted file mode 100644 index 5378932d9b..0000000000 --- a/internal/util/flowgraph/param_table.go +++ /dev/null @@ -1,45 +0,0 @@ -package flowgraph - -import ( - "strconv" - - "github.com/zilliztech/milvus-distributed/internal/util/paramtable" -) - -type ParamTable struct { - paramtable.BaseTable -} - -var Params ParamTable - -func (p *ParamTable) Init() { - p.BaseTable.Init() - err := p.LoadYaml("advanced/flow_graph.yaml") - if err != nil { - panic(err) - } -} - -func (p *ParamTable) FlowGraphMaxQueueLength() int32 { - queueLength, err := p.Load("flowGraph.maxQueueLength") - if err != nil { - panic(err) - } - length, err := strconv.Atoi(queueLength) - if err != nil { - panic(err) - } - return int32(length) -} - -func (p *ParamTable) FlowGraphMaxParallelism() int32 { - maxParallelism, err := p.Load("flowGraph.maxParallelism") - if err != nil { - panic(err) - } - maxPara, err := strconv.Atoi(maxParallelism) - if err != nil { - panic(err) - } - return int32(maxPara) -} diff --git a/internal/util/flowgraph/param_table_test.go b/internal/util/flowgraph/param_table_test.go deleted file mode 100644 index 136c8854c9..0000000000 --- a/internal/util/flowgraph/param_table_test.go +++ /dev/null @@ -1,19 +0,0 @@ -package flowgraph - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestParamTable_flowGraphMaxQueueLength(t *testing.T) { - Params.Init() - length := Params.FlowGraphMaxQueueLength() - assert.Equal(t, length, int32(1024)) -} - -func TestParamTable_flowGraphMaxParallelism(t *testing.T) { - Params.Init() - maxParallelism := Params.FlowGraphMaxParallelism() - assert.Equal(t, maxParallelism, int32(1024)) -} diff --git a/internal/util/paramtable/paramtable.go b/internal/util/paramtable/paramtable.go index 9b8ceb43f5..7d7afa9835 100644 --- a/internal/util/paramtable/paramtable.go +++ b/internal/util/paramtable/paramtable.go @@ -12,7 +12,7 @@ package paramtable import ( - "fmt" + "log" "path" "runtime" "strconv" @@ -20,6 +20,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/kv" + "github.com/spf13/cast" "github.com/spf13/viper" ) @@ -86,11 +87,34 @@ func (gp *BaseTable) LoadYaml(fileName string) error { } for _, key := range config.AllKeys() { - fmt.Println(key) - err := gp.params.Save(strings.ToLower(key), config.GetString(key)) + val := config.Get(key) + str, err := cast.ToStringE(val) + if err != nil { + switch val := val.(type) { + case []interface{}: + str = str[:0] + for _, v := range val { + ss, err := cast.ToStringE(v) + if err != nil { + log.Panic(err) + } + if len(str) == 0 { + str = ss + } else { + str = str + "," + ss + } + } + + default: + log.Panicf("undefine config type, key=%s", key) + } + } + log.Printf("%s : %s", key, str) + err = gp.params.Save(strings.ToLower(key), str) if err != nil { panic(err) } + } return nil diff --git a/tools/core_gen/all_generate.py b/tools/core_gen/all_generate.py index 3ce44022f0..fec35bf2da 100755 --- a/tools/core_gen/all_generate.py +++ b/tools/core_gen/all_generate.py @@ -48,10 +48,16 @@ if __name__ == "__main__": node_names = ["Expr", "PlanNode"] visitor_info = { - 'Expr': [{ - 'visitor_name': "ShowExprVisitor", - "parameter_name": 'expr', - }], + 'Expr': [ + { + 'visitor_name': "ShowExprVisitor", + "parameter_name": 'expr', + }, + { + 'visitor_name': "ExecExprVisitor", + "parameter_name": 'expr', + }, + ], 'PlanNode': [ { 'visitor_name': "ShowPlanNodeVisitor", diff --git a/tools/core_gen/templates/visitor_derived.h b/tools/core_gen/templates/visitor_derived.h index 49d31fa44c..cda1ea1a74 100644 --- a/tools/core_gen/templates/visitor_derived.h +++ b/tools/core_gen/templates/visitor_derived.h @@ -2,7 +2,7 @@ @@root_base@@Visitor #### @@@@body@struct_name - virtual void + void visit(@@struct_name@@& @@parameter_name@@) override; #### @@@@main @@ -21,4 +21,4 @@ class @@visitor_name@@ : @@base_visitor@@ { }; } // namespace @@namespace@@ -#### \ No newline at end of file +####