Set master config

Signed-off-by: neza2017 <yefu.chen@zilliz.com>
This commit is contained in:
neza2017 2020-11-19 10:46:17 +08:00 committed by yefu.chen
parent c3de667bf5
commit ce89f64bb3
46 changed files with 459 additions and 247 deletions

View File

@ -21,6 +21,7 @@ func main() {
etcdAddress, _ := masterParams.Params.EtcdAddress() etcdAddress, _ := masterParams.Params.EtcdAddress()
etcdRootPath, _ := masterParams.Params.EtcdRootPath() etcdRootPath, _ := masterParams.Params.EtcdRootPath()
pulsarAddr, _ := masterParams.Params.PulsarAddress() pulsarAddr, _ := masterParams.Params.PulsarAddress()
pulsarAddr = "pulsar://" + pulsarAddr
defaultRecordSize := masterParams.Params.DefaultRecordSize() defaultRecordSize := masterParams.Params.DefaultRecordSize()
minimumAssignSize := masterParams.Params.MinimumAssignSize() minimumAssignSize := masterParams.Params.MinimumAssignSize()
segmentThreshold := masterParams.Params.SegmentThreshold() segmentThreshold := masterParams.Params.SegmentThreshold()
@ -34,15 +35,15 @@ func main() {
MetaRootPath: etcdRootPath, MetaRootPath: etcdRootPath,
EtcdAddr: []string{etcdAddress}, EtcdAddr: []string{etcdAddress},
PulsarAddr: pulsarAddr, PulsarAddr: pulsarAddr,
ProxyIDs: nil, ProxyIDs: masterParams.Params.ProxyIDList(),
PulsarProxyChannels: nil, PulsarProxyChannels: masterParams.Params.ProxyTimeSyncChannels(),
PulsarProxySubName: "", PulsarProxySubName: masterParams.Params.ProxyTimeSyncSubName(),
SoftTTBInterval: 0, SoftTTBInterval: masterParams.Params.SoftTimeTickBarrierInterval(),
WriteIDs: nil, WriteIDs: masterParams.Params.WriteIDList(),
PulsarWriteChannels: nil, PulsarWriteChannels: masterParams.Params.WriteTimeSyncChannels(),
PulsarWriteSubName: "", PulsarWriteSubName: masterParams.Params.WriteTimeSyncSubName(),
PulsarDMChannels: nil, PulsarDMChannels: masterParams.Params.DMTimeSyncChannels(),
PulsarK2SChannels: nil, PulsarK2SChannels: masterParams.Params.K2STimeSyncChannels(),
DefaultRecordSize: defaultRecordSize, DefaultRecordSize: defaultRecordSize,
MinimumAssignSize: minimumAssignSize, MinimumAssignSize: minimumAssignSize,
SegmentThreshold: segmentThreshold, SegmentThreshold: segmentThreshold,

View File

@ -1,14 +0,0 @@
# Copyright (C) 2019-2020 Zilliz. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under the License.
flowGraph:
maxQueueLength: 1024
maxParallelism: 1024

View File

@ -9,15 +9,26 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under the License. # or implied. See the License for the specific language governing permissions and limitations under the License.
service: reader:
statsServiceTimeInterval: 1000 # milliseconds stats:
publishInterval: 1000 # milliseconds
msgStream: dataSync:
receiveBufSize: # msgPack chan buffer size flowGraph:
statsMsgStream: 64 maxQueueLength: 1024
dmMsgStream: 1024 maxParallelism: 1024
searchMsgStream: 512
searchResultMsgStream: 64 msgStream:
pulsarBufSize: # pulsar chan buffer size dm:
search: 512 recvBufSize: 1024 # msgPack chan buffer size
dm: 1024 pulsarBufSize: 1024 # pulsar chan buffer size
search:
recvBufSize: 512
pulsarBufSize: 512
searchResult:
recvBufSize: 64
stats:
recvBufSize: 64

View File

@ -14,11 +14,23 @@ master:
port: 53100 port: 53100
pulsarmoniterinterval: 1 pulsarmoniterinterval: 1
pulsartopic: "monitor-topic" pulsartopic: "monitor-topic"
proxyidlist: [1, 2]
proxyTimeSyncChannels: ["proxy1", "proxy2"]
proxyTimeSyncSubName: "proxy-topic"
softTimeTickBarrierInterval: 500
writeidlist: [3, 4]
writeTimeSyncChannels: ["write3", "write4"]
writeTimeSyncSubName: "write-topic"
dmTimeSyncChannels: ["dm5", "dm6"]
k2sTimeSyncChannels: ["k2s7", "k2s8"]
defaultSizePerRecord: 1024 defaultSizePerRecord: 1024
minimumAssignSize: 1048576 minimumAssignSize: 1048576
segmentThreshold: 536870912 segmentThreshold: 536870912
segmentExpireDuration: 2000 segmentExpireDuration: 2000
proxyidlist: [0]
querynodenum: 1 querynodenum: 1
writenodenum: 1 writenodenum: 1
statsChannels: "statistic" statsChannels: "statistic"

1
go.mod
View File

@ -31,6 +31,7 @@ require (
github.com/sirupsen/logrus v1.6.0 // indirect github.com/sirupsen/logrus v1.6.0 // indirect
github.com/smartystreets/goconvey v1.6.4 // indirect github.com/smartystreets/goconvey v1.6.4 // indirect
github.com/spaolacci/murmur3 v1.1.0 github.com/spaolacci/murmur3 v1.1.0
github.com/spf13/cast v1.3.0
github.com/spf13/viper v1.7.1 github.com/spf13/viper v1.7.1
github.com/stretchr/testify v1.6.1 github.com/stretchr/testify v1.6.1
github.com/tikv/client-go v0.0.0-20200824032810-95774393107b github.com/tikv/client-go v0.0.0-20200824032810-95774393107b

View File

@ -6,6 +6,7 @@ set(MILVUS_QUERY_SRCS
visitors/ShowPlanNodeVisitor.cpp visitors/ShowPlanNodeVisitor.cpp
visitors/ExecPlanNodeVisitor.cpp visitors/ExecPlanNodeVisitor.cpp
visitors/ShowExprVisitor.cpp visitors/ShowExprVisitor.cpp
visitors/ExecExprVisitor.cpp
Plan.cpp Plan.cpp
) )
add_library(milvus_query ${MILVUS_QUERY_SRCS}) add_library(milvus_query ${MILVUS_QUERY_SRCS})

View File

@ -22,18 +22,10 @@ using ExprPtr = std::unique_ptr<Expr>;
struct BinaryExpr : Expr { struct BinaryExpr : Expr {
ExprPtr left_; ExprPtr left_;
ExprPtr right_; ExprPtr right_;
public:
void
accept(ExprVisitor&) = 0;
}; };
struct UnaryExpr : Expr { struct UnaryExpr : Expr {
ExprPtr child_; ExprPtr child_;
public:
void
accept(ExprVisitor&) = 0;
}; };
// TODO: not enabled in sprint 1 // TODO: not enabled in sprint 1
@ -60,7 +52,7 @@ using FieldId = std::string;
struct TermExpr : Expr { struct TermExpr : Expr {
FieldId field_id_; FieldId field_id_;
DataType data_type_; DataType data_type_ = DataType::NONE;
// std::vector<std::any> terms_; // std::vector<std::any> terms_;
protected: protected:
@ -74,7 +66,7 @@ struct TermExpr : Expr {
struct RangeExpr : Expr { struct RangeExpr : Expr {
FieldId field_id_; FieldId field_id_;
DataType data_type_; DataType data_type_ = DataType::NONE;
enum class OpType { LessThan, LessEqual, GreaterThan, GreaterEqual, Equal, NotEqual }; enum class OpType { LessThan, LessEqual, GreaterThan, GreaterEqual, Equal, NotEqual };
static const std::map<std::string, OpType> mapping_; // op_name -> op static const std::map<std::string, OpType> mapping_; // op_name -> op

View File

@ -52,8 +52,10 @@ ParseRangeNodeImpl(const Schema& schema, const std::string& field_name, const Js
expr->data_type_ = data_type; expr->data_type_ = data_type;
expr->field_id_ = field_name; expr->field_id_ = field_name;
for (auto& item : body.items()) { for (auto& item : body.items()) {
auto& op_name = item.key(); auto op_name = to_lower(item.key());
auto op = RangeExpr::mapping_.at(to_lower(op_name));
AssertInfo(RangeExpr::mapping_.count(op_name), "op(" + op_name + ") not found");
auto op = RangeExpr::mapping_.at(op_name);
T value = item.value(); T value = item.value();
expr->conditions_.emplace_back(op, value); expr->conditions_.emplace_back(op, value);
} }
@ -130,7 +132,6 @@ CreatePlan(const Schema& schema, const std::string& dsl_str) {
std::unique_ptr<PlaceholderGroup> std::unique_ptr<PlaceholderGroup>
ParsePlaceholderGroup(const Plan* plan, const std::string& blob) { ParsePlaceholderGroup(const Plan* plan, const std::string& blob) {
(void)plan;
namespace ser = milvus::proto::service; namespace ser = milvus::proto::service;
auto result = std::make_unique<PlaceholderGroup>(); auto result = std::make_unique<PlaceholderGroup>();
ser::PlaceholderGroup ph_group; ser::PlaceholderGroup ph_group;
@ -139,9 +140,14 @@ ParsePlaceholderGroup(const Plan* plan, const std::string& blob) {
for (auto& info : ph_group.placeholders()) { for (auto& info : ph_group.placeholders()) {
Placeholder element; Placeholder element;
element.tag_ = info.tag(); element.tag_ = info.tag();
Assert(plan->tag2field_.count(element.tag_));
auto field_id = plan->tag2field_.at(element.tag_);
auto& field_meta = plan->schema_[field_id];
element.num_of_queries_ = info.values_size(); element.num_of_queries_ = info.values_size();
AssertInfo(element.num_of_queries_, "must have queries"); AssertInfo(element.num_of_queries_, "must have queries");
Assert(element.num_of_queries_ > 0);
element.line_sizeof_ = info.values().Get(0).size(); element.line_sizeof_ = info.values().Get(0).size();
Assert(field_meta.get_sizeof() == element.line_sizeof_);
auto& target = element.blob_; auto& target = element.blob_;
target.reserve(element.line_sizeof_ * element.num_of_queries_); target.reserve(element.line_sizeof_ * element.num_of_queries_);
for (auto& line : info.values()) { for (auto& line : info.values()) {

View File

@ -14,7 +14,7 @@ using Json = nlohmann::json;
// class definitions // class definitions
struct Plan { struct Plan {
public: public:
Plan(const Schema& schema) : schema_(schema) { explicit Plan(const Schema& schema) : schema_(schema) {
} }
public: public:

View File

@ -38,10 +38,6 @@ struct VectorPlanNode : PlanNode {
std::optional<ExprPtr> predicate_; std::optional<ExprPtr> predicate_;
QueryInfo query_info_; QueryInfo query_info_;
std::string placeholder_tag_; std::string placeholder_tag_;
public:
virtual void
accept(PlanNodeVisitor&) = 0;
}; };
struct FloatVectorANNS : VectorPlanNode { struct FloatVectorANNS : VectorPlanNode {

View File

@ -0,0 +1,25 @@
#error TODO: copy this file out, and modify the content.
#include "query/generated/ExecExprVisitor.h"
namespace milvus::query {
void
ExecExprVisitor::visit(BoolUnaryExpr& expr) {
// TODO
}
void
ExecExprVisitor::visit(BoolBinaryExpr& expr) {
// TODO
}
void
ExecExprVisitor::visit(TermExpr& expr) {
// TODO
}
void
ExecExprVisitor::visit(RangeExpr& expr) {
// TODO
}
} // namespace milvus::query

View File

@ -0,0 +1,41 @@
#pragma once
// Generated File
// DO NOT EDIT
#include "segcore/SegmentNaive.h"
#include <optional>
#include "ExprVisitor.h"
namespace milvus::query {
class ExecExprVisitor : ExprVisitor {
public:
void
visit(BoolUnaryExpr& expr) override;
void
visit(BoolBinaryExpr& expr) override;
void
visit(TermExpr& expr) override;
void
visit(RangeExpr& expr) override;
public:
using RetType = faiss::ConcurrentBitsetPtr;
explicit ExecExprVisitor(segcore::SegmentNaive& segment) : segment_(segment) {
}
RetType
call_child(Expr& expr) {
Assert(!ret_.has_value());
expr.accept(*this);
Assert(ret_.has_value());
auto ret = std::move(ret_);
ret_ = std::nullopt;
return std::move(ret.value());
}
private:
segcore::SegmentNaive& segment_;
std::optional<RetType> ret_;
};
} // namespace milvus::query

View File

@ -9,10 +9,10 @@
namespace milvus::query { namespace milvus::query {
class ExecPlanNodeVisitor : PlanNodeVisitor { class ExecPlanNodeVisitor : PlanNodeVisitor {
public: public:
virtual void void
visit(FloatVectorANNS& node) override; visit(FloatVectorANNS& node) override;
virtual void void
visit(BinaryVectorANNS& node) override; visit(BinaryVectorANNS& node) override;
public: public:

View File

@ -9,16 +9,16 @@
namespace milvus::query { namespace milvus::query {
class ShowExprVisitor : ExprVisitor { class ShowExprVisitor : ExprVisitor {
public: public:
virtual void void
visit(BoolUnaryExpr& expr) override; visit(BoolUnaryExpr& expr) override;
virtual void void
visit(BoolBinaryExpr& expr) override; visit(BoolBinaryExpr& expr) override;
virtual void void
visit(TermExpr& expr) override; visit(TermExpr& expr) override;
virtual void void
visit(RangeExpr& expr) override; visit(RangeExpr& expr) override;
public: public:

View File

@ -10,10 +10,10 @@
namespace milvus::query { namespace milvus::query {
class ShowPlanNodeVisitor : PlanNodeVisitor { class ShowPlanNodeVisitor : PlanNodeVisitor {
public: public:
virtual void void
visit(FloatVectorANNS& node) override; visit(FloatVectorANNS& node) override;
virtual void void
visit(BinaryVectorANNS& node) override; visit(BinaryVectorANNS& node) override;
public: public:

View File

@ -0,0 +1,51 @@
#include "segcore/SegmentNaive.h"
#include <optional>
#include "query/generated/ExecExprVisitor.h"
namespace milvus::query {
#if 1
// THIS CONTAINS EXTRA BODY FOR VISITOR
// WILL BE USED BY GENERATOR
namespace impl {
class ExecExprVisitor : ExprVisitor {
public:
using RetType = faiss::ConcurrentBitsetPtr;
explicit ExecExprVisitor(segcore::SegmentNaive& segment) : segment_(segment) {
}
RetType
call_child(Expr& expr) {
Assert(!ret_.has_value());
expr.accept(*this);
Assert(ret_.has_value());
auto ret = std::move(ret_);
ret_ = std::nullopt;
return std::move(ret.value());
}
private:
segcore::SegmentNaive& segment_;
std::optional<RetType> ret_;
};
} // namespace impl
#endif
void
ExecExprVisitor::visit(BoolUnaryExpr& expr) {
PanicInfo("unimplemented");
}
void
ExecExprVisitor::visit(BoolBinaryExpr& expr) {
PanicInfo("unimplemented");
}
void
ExecExprVisitor::visit(TermExpr& expr) {
PanicInfo("unimplemented");
}
void
ExecExprVisitor::visit(RangeExpr& expr) {
}
} // namespace milvus::query

View File

@ -3,7 +3,6 @@ set(SEGCORE_FILES
SegmentNaive.cpp SegmentNaive.cpp
SegmentSmallIndex.cpp SegmentSmallIndex.cpp
IndexMeta.cpp IndexMeta.cpp
ConcurrentVector.cpp
Collection.cpp Collection.cpp
collection_c.cpp collection_c.cpp
segment_c.cpp segment_c.cpp

View File

@ -4,7 +4,6 @@
#include "pb/etcd_meta.pb.h" #include "pb/etcd_meta.pb.h"
#include <google/protobuf/text_format.h> #include <google/protobuf/text_format.h>
#include <knowhere/index/vector_index/adapter/VectorAdapter.h> #include <knowhere/index/vector_index/adapter/VectorAdapter.h>
#include <cstring>
namespace milvus::segcore { namespace milvus::segcore {
@ -134,7 +133,7 @@ Collection::parse() {
auto schema = std::make_shared<Schema>(); auto schema = std::make_shared<Schema>();
for (const milvus::proto::schema::FieldSchema& child : collection_meta.schema().fields()) { for (const milvus::proto::schema::FieldSchema& child : collection_meta.schema().fields()) {
const auto& type_params = child.type_params(); const auto& type_params = child.type_params();
int dim = 16; int64_t dim = 16;
for (const auto& type_param : type_params) { for (const auto& type_param : type_params) {
if (type_param.key() == "dim") { if (type_param.key() == "dim") {
dim = strtoll(type_param.value().c_str(), nullptr, 10); dim = strtoll(type_param.value().c_str(), nullptr, 10);

View File

@ -1,5 +0,0 @@
#include <iostream>
#include "segcore/ConcurrentVector.h"
namespace milvus::segcore {}

View File

@ -11,7 +11,8 @@ IndexingEntry::BuildIndexRange(int64_t ack_beg, int64_t ack_end, const VectorBas
assert(field_meta_.get_data_type() == DataType::VECTOR_FLOAT); assert(field_meta_.get_data_type() == DataType::VECTOR_FLOAT);
auto dim = field_meta_.get_dim(); auto dim = field_meta_.get_dim();
auto source = static_cast<const ConcurrentVector<float>*>(vec_base); auto source = dynamic_cast<const ConcurrentVector<float>*>(vec_base);
Assert(source);
auto chunk_size = source->chunk_size(); auto chunk_size = source->chunk_size();
assert(ack_end <= chunk_size); assert(ack_end <= chunk_size);
auto conf = get_build_conf(); auto conf = get_build_conf();

View File

@ -451,10 +451,10 @@ SegmentNaive::QueryDeprecated(query::QueryDeprecatedPtr query_info, Timestamp ti
Status Status
SegmentNaive::Close() { SegmentNaive::Close() {
if (this->record_.reserved != this->record_.ack_responder_.GetAck()) { if (this->record_.reserved != this->record_.ack_responder_.GetAck()) {
std::runtime_error("insert not ready"); PanicInfo("insert not ready");
} }
if (this->deleted_record_.reserved != this->record_.ack_responder_.GetAck()) { if (this->deleted_record_.reserved != this->deleted_record_.ack_responder_.GetAck()) {
std::runtime_error("delete not ready"); PanicInfo("delete not ready");
} }
state_ = SegmentState::Closed; state_ = SegmentState::Closed;
return Status::OK(); return Status::OK();

View File

@ -20,8 +20,6 @@
namespace milvus::segcore { namespace milvus::segcore {
class SegmentNaive : public SegmentBase { class SegmentNaive : public SegmentBase {
public: public:
virtual ~SegmentNaive() = default;
// SegmentBase(std::shared_ptr<FieldsInfo> collection); // SegmentBase(std::shared_ptr<FieldsInfo> collection);
int64_t int64_t
@ -47,7 +45,7 @@ class SegmentNaive : public SegmentBase {
Status Status
QueryDeprecated(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& results) override; QueryDeprecated(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& results) override;
virtual Status Status
Search(const query::Plan* Plan, Search(const query::Plan* Plan,
const query::PlaceholderGroup* placeholder_groups[], const query::PlaceholderGroup* placeholder_groups[],
const Timestamp timestamps[], const Timestamp timestamps[],
@ -106,7 +104,7 @@ class SegmentNaive : public SegmentBase {
friend std::unique_ptr<SegmentBase> friend std::unique_ptr<SegmentBase>
CreateSegment(SchemaPtr schema); CreateSegment(SchemaPtr schema);
explicit SegmentNaive(SchemaPtr schema) : schema_(schema), record_(*schema) { explicit SegmentNaive(const SchemaPtr& schema) : schema_(schema), record_(*schema) {
} }
private: private:

View File

@ -50,7 +50,7 @@ SegmentSmallIndex::get_deleted_bitmap(int64_t del_barrier,
for (auto del_index = del_barrier; del_index < old->del_barrier; ++del_index) { for (auto del_index = del_barrier; del_index < old->del_barrier; ++del_index) {
// get uid in delete logs // get uid in delete logs
auto uid = deleted_record_.uids_[del_index]; auto uid = deleted_record_.uids_[del_index];
// map uid to corrensponding offsets, select the max one, which should be the target // map uid to corresponding offsets, select the max one, which should be the target
// the max one should be closest to query_timestamp, so the delete log should refer to it // the max one should be closest to query_timestamp, so the delete log should refer to it
int64_t the_offset = -1; int64_t the_offset = -1;
auto [iter_b, iter_e] = uid2offset_.equal_range(uid); auto [iter_b, iter_e] = uid2offset_.equal_range(uid);
@ -73,7 +73,7 @@ SegmentSmallIndex::get_deleted_bitmap(int64_t del_barrier,
for (auto del_index = old->del_barrier; del_index < del_barrier; ++del_index) { for (auto del_index = old->del_barrier; del_index < del_barrier; ++del_index) {
// get uid in delete logs // get uid in delete logs
auto uid = deleted_record_.uids_[del_index]; auto uid = deleted_record_.uids_[del_index];
// map uid to corrensponding offsets, select the max one, which should be the target // map uid to corresponding offsets, select the max one, which should be the target
// the max one should be closest to query_timestamp, so the delete log should refer to it // the max one should be closest to query_timestamp, so the delete log should refer to it
int64_t the_offset = -1; int64_t the_offset = -1;
auto [iter_b, iter_e] = uid2offset_.equal_range(uid); auto [iter_b, iter_e] = uid2offset_.equal_range(uid);
@ -228,7 +228,7 @@ SegmentSmallIndex::QueryBruteForceImpl(const query::QueryInfo& info,
QueryResult& results) { QueryResult& results) {
// step 1: binary search to find the barrier of the snapshot // step 1: binary search to find the barrier of the snapshot
auto ins_barrier = get_barrier(record_, timestamp); auto ins_barrier = get_barrier(record_, timestamp);
auto del_barrier = get_barrier(deleted_record_, timestamp); // auto del_barrier = get_barrier(deleted_record_, timestamp);
#if 0 #if 0
auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier); auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier);
Assert(bitmap_holder); Assert(bitmap_holder);
@ -321,7 +321,6 @@ SegmentSmallIndex::QueryDeprecated(query::QueryDeprecatedPtr query_info, Timesta
x = dis(e); x = dis(e);
} }
} }
int64_t inferred_dim = query_info->query_raw_data.size() / query_info->num_queries;
// TODO // TODO
query::QueryInfo info{ query::QueryInfo info{
query_info->topK, query_info->topK,
@ -338,10 +337,10 @@ SegmentSmallIndex::QueryDeprecated(query::QueryDeprecatedPtr query_info, Timesta
Status Status
SegmentSmallIndex::Close() { SegmentSmallIndex::Close() {
if (this->record_.reserved != this->record_.ack_responder_.GetAck()) { if (this->record_.reserved != this->record_.ack_responder_.GetAck()) {
std::runtime_error("insert not ready"); PanicInfo("insert not ready");
} }
if (this->deleted_record_.reserved != this->record_.ack_responder_.GetAck()) { if (this->deleted_record_.reserved != this->deleted_record_.ack_responder_.GetAck()) {
std::runtime_error("delete not ready"); PanicInfo("delete not ready");
} }
state_ = SegmentState::Closed; state_ = SegmentState::Closed;
return Status::OK(); return Status::OK();
@ -357,8 +356,6 @@ SegmentSmallIndex::BuildVecIndexImpl(const IndexMeta::Entry& entry) {
auto dim = field.get_dim(); auto dim = field.get_dim();
auto indexing = knowhere::VecIndexFactory::GetInstance().CreateVecIndex(entry.type, entry.mode); auto indexing = knowhere::VecIndexFactory::GetInstance().CreateVecIndex(entry.type, entry.mode);
auto chunk_size = record_.uids_.chunk_size();
auto& uids = record_.uids_; auto& uids = record_.uids_;
auto entities = record_.get_vec_entity<float>(offset); auto entities = record_.get_vec_entity<float>(offset);
@ -398,7 +395,7 @@ SegmentSmallIndex::BuildIndex(IndexMetaPtr remote_index_meta) {
auto index_meta = std::make_shared<IndexMeta>(schema_); auto index_meta = std::make_shared<IndexMeta>(schema_);
// TODO: this is merge of query conf and insert conf // TODO: this is merge of query conf and insert conf
// TODO: should be splitted into multiple configs // TODO: should be split into multiple configs
auto conf = milvus::knowhere::Config{ auto conf = milvus::knowhere::Config{
{milvus::knowhere::meta::DIM, dim}, {milvus::knowhere::IndexParams::nlist, 100}, {milvus::knowhere::meta::DIM, dim}, {milvus::knowhere::IndexParams::nlist, 100},
{milvus::knowhere::IndexParams::nprobe, 4}, {milvus::knowhere::IndexParams::m, 4}, {milvus::knowhere::IndexParams::nprobe, 4}, {milvus::knowhere::IndexParams::m, 4},
@ -431,8 +428,16 @@ SegmentSmallIndex::BuildIndex(IndexMetaPtr remote_index_meta) {
} }
index_ready_ = true; index_ready_ = true;
#endif
return Status::OK(); return Status::OK();
#endif
}
static uint64_t
upper_align(int64_t value, int64_t align) {
Assert(align > 0);
Assert((align & (align - 1)) == 0);
auto groups = (value + align - 1) / align;
return groups * align;
} }
int64_t int64_t
@ -448,9 +453,9 @@ SegmentSmallIndex::GetMemoryUsageInBytes() {
} }
} }
#endif #endif
int64_t ins_n = (record_.reserved + DefaultElementPerChunk - 1) & ~(DefaultElementPerChunk - 1); int64_t ins_n = upper_align(record_.reserved, DefaultElementPerChunk);
total_bytes += ins_n * (schema_->get_total_sizeof() + 16 + 1); total_bytes += ins_n * (schema_->get_total_sizeof() + 16 + 1);
int64_t del_n = (deleted_record_.reserved + DefaultElementPerChunk - 1) & ~(DefaultElementPerChunk - 1); int64_t del_n = upper_align(deleted_record_.reserved, DefaultElementPerChunk);
total_bytes += del_n * (16 * 2); total_bytes += del_n * (16 * 2);
return total_bytes; return total_bytes;
} }

View File

@ -46,10 +46,6 @@ namespace milvus::segcore {
class SegmentSmallIndex : public SegmentBase { class SegmentSmallIndex : public SegmentBase {
public: public:
virtual ~SegmentSmallIndex() = default;
// SegmentBase(std::shared_ptr<FieldsInfo> collection);
int64_t int64_t
PreInsert(int64_t size) override; PreInsert(int64_t size) override;
@ -111,6 +107,9 @@ class SegmentSmallIndex : public SegmentBase {
GetMemoryUsageInBytes() override; GetMemoryUsageInBytes() override;
public: public:
void
get_insert_record();
ssize_t ssize_t
get_row_count() const override { get_row_count() const override {
return record_.ack_responder_.GetAck(); return record_.ack_responder_.GetAck();
@ -130,7 +129,8 @@ class SegmentSmallIndex : public SegmentBase {
friend std::unique_ptr<SegmentBase> friend std::unique_ptr<SegmentBase>
CreateSegment(SchemaPtr schema); CreateSegment(SchemaPtr schema);
explicit SegmentSmallIndex(SchemaPtr schema) : schema_(schema), record_(*schema_), indexing_record_(*schema_) { explicit SegmentSmallIndex(SchemaPtr schema)
: schema_(std::move(schema)), record_(*schema_), indexing_record_(*schema_) {
} }
public: public:

View File

@ -141,7 +141,7 @@ TEST(CApiTest, SearchTest) {
auto blob = raw_group.SerializeAsString(); auto blob = raw_group.SerializeAsString();
auto plan = CreatePlan(collection, dsl_string); auto plan = CreatePlan(collection, dsl_string);
auto placeholderGroup = ParsePlaceholderGroup(nullptr, blob.data(), blob.length()); auto placeholderGroup = ParsePlaceholderGroup(plan, blob.data(), blob.length());
std::vector<CPlaceholderGroup> placeholderGroups; std::vector<CPlaceholderGroup> placeholderGroups;
placeholderGroups.push_back(placeholderGroup); placeholderGroups.push_back(placeholderGroup);
timestamps.clear(); timestamps.clear();
@ -228,7 +228,7 @@ TEST(CApiTest, BuildIndexTest) {
auto blob = raw_group.SerializeAsString(); auto blob = raw_group.SerializeAsString();
auto plan = CreatePlan(collection, dsl_string); auto plan = CreatePlan(collection, dsl_string);
auto placeholderGroup = ParsePlaceholderGroup(nullptr, blob.data(), blob.length()); auto placeholderGroup = ParsePlaceholderGroup(plan, blob.data(), blob.length());
std::vector<CPlaceholderGroup> placeholderGroups; std::vector<CPlaceholderGroup> placeholderGroups;
placeholderGroups.push_back(placeholderGroup); placeholderGroups.push_back(placeholderGroup);
timestamps.clear(); timestamps.clear();

View File

@ -133,7 +133,7 @@ TEST(Query, ParsePlaceholderGroup) {
{ {
"bool": { "bool": {
"vector": { "vector": {
"Vec": { "fakevec": {
"metric_type": "L2", "metric_type": "L2",
"params": { "params": {
"nprobe": 10 "nprobe": 10

View File

@ -1,9 +1,13 @@
package paramtable package paramtable
import ( import (
"log"
"strconv" "strconv"
"strings"
"github.com/zilliztech/milvus-distributed/internal/util/paramtable" "github.com/zilliztech/milvus-distributed/internal/util/paramtable"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
) )
type ParamTable struct { type ParamTable struct {
@ -80,3 +84,97 @@ func (p *ParamTable) StatsChannels() string {
channels, _ := p.Load("master.statsChannels") channels, _ := p.Load("master.statsChannels")
return channels return channels
} }
func (p *ParamTable) ProxyIDList() []typeutil.UniqueID {
id, err := p.Load("master.proxyidlist")
if err != nil {
log.Panicf("load proxy id list error, %s", err.Error())
}
ids := strings.Split(id, ",")
idlist := make([]typeutil.UniqueID, 0, len(ids))
for _, i := range ids {
v, err := strconv.ParseInt(i, 10, 64)
if err != nil {
log.Panicf("load proxy id list error, %s", err.Error())
}
idlist = append(idlist, typeutil.UniqueID(v))
}
return idlist
}
func (p *ParamTable) ProxyTimeSyncChannels() []string {
chs, err := p.Load("master.proxyTimeSyncChannels")
if err != nil {
log.Panic(err)
}
return strings.Split(chs, ",")
}
func (p *ParamTable) ProxyTimeSyncSubName() string {
name, err := p.Load("master.proxyTimeSyncSubName")
if err != nil {
log.Panic(err)
}
return name
}
func (p *ParamTable) SoftTimeTickBarrierInterval() typeutil.Timestamp {
t, err := p.Load("master.softTimeTickBarrierInterval")
if err != nil {
log.Panic(err)
}
v, err := strconv.ParseInt(t, 10, 64)
if err != nil {
log.Panic(err)
}
return tsoutil.ComposeTS(v, 0)
}
func (p *ParamTable) WriteIDList() []typeutil.UniqueID {
id, err := p.Load("master.writeidlist")
if err != nil {
log.Panic(err)
}
ids := strings.Split(id, ",")
idlist := make([]typeutil.UniqueID, 0, len(ids))
for _, i := range ids {
v, err := strconv.ParseInt(i, 10, 64)
if err != nil {
log.Panicf("load proxy id list error, %s", err.Error())
}
idlist = append(idlist, typeutil.UniqueID(v))
}
return idlist
}
func (p *ParamTable) WriteTimeSyncChannels() []string {
chs, err := p.Load("master.writeTimeSyncChannels")
if err != nil {
log.Fatal(err)
}
return strings.Split(chs, ",")
}
func (p *ParamTable) WriteTimeSyncSubName() string {
name, err := p.Load("master.writeTimeSyncSubName")
if err != nil {
log.Fatal(err)
}
return name
}
func (p *ParamTable) DMTimeSyncChannels() []string {
chs, err := p.Load("master.dmTimeSyncChannels")
if err != nil {
log.Fatal(err)
}
return strings.Split(chs, ",")
}
func (p *ParamTable) K2STimeSyncChannels() []string {
chs, err := p.Load("master.k2sTimeSyncChannels")
if err != nil {
log.Fatal(err)
}
return strings.Split(chs, ",")
}

View File

@ -37,7 +37,6 @@ func (dsService *dataSyncService) initNodes() {
// TODO: add delete pipeline support // TODO: add delete pipeline support
dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx) dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx)
flowgraph.Params.Init()
var dmStreamNode Node = newDmInputNode(dsService.ctx) var dmStreamNode Node = newDmInputNode(dsService.ctx)
var filterDmNode Node = newFilteredDmNode() var filterDmNode Node = newFilteredDmNode()

View File

@ -1,7 +1,5 @@
package reader package reader
import "github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
type deleteNode struct { type deleteNode struct {
BaseNode BaseNode
deleteMsg deleteMsg deleteMsg deleteMsg
@ -16,8 +14,8 @@ func (dNode *deleteNode) Operate(in []*Msg) []*Msg {
} }
func newDeleteNode() *deleteNode { func newDeleteNode() *deleteNode {
maxQueueLength := flowgraph.Params.FlowGraphMaxQueueLength() maxQueueLength := Params.flowGraphMaxQueueLength()
maxParallelism := flowgraph.Params.FlowGraphMaxParallelism() maxParallelism := Params.flowGraphMaxParallelism()
baseNode := BaseNode{} baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxQueueLength(maxQueueLength)

View File

@ -5,7 +5,6 @@ import (
"github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/msgstream"
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
) )
type filterDmNode struct { type filterDmNode struct {
@ -55,8 +54,8 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
} }
func newFilteredDmNode() *filterDmNode { func newFilteredDmNode() *filterDmNode {
maxQueueLength := flowgraph.Params.FlowGraphMaxQueueLength() maxQueueLength := Params.flowGraphMaxQueueLength()
maxParallelism := flowgraph.Params.FlowGraphMaxParallelism() maxParallelism := Params.flowGraphMaxParallelism()
baseNode := BaseNode{} baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxQueueLength(maxQueueLength)

View File

@ -6,7 +6,6 @@ import (
"sync" "sync"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
) )
type insertNode struct { type insertNode struct {
@ -127,8 +126,8 @@ func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *syn
} }
func newInsertNode(replica *collectionReplica) *insertNode { func newInsertNode(replica *collectionReplica) *insertNode {
maxQueueLength := flowgraph.Params.FlowGraphMaxQueueLength() maxQueueLength := Params.flowGraphMaxQueueLength()
maxParallelism := flowgraph.Params.FlowGraphMaxParallelism() maxParallelism := Params.flowGraphMaxParallelism()
baseNode := BaseNode{} baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxQueueLength(maxQueueLength)

View File

@ -1,7 +1,5 @@
package reader package reader
import "github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
type key2SegNode struct { type key2SegNode struct {
BaseNode BaseNode
key2SegMsg key2SegMsg key2SegMsg key2SegMsg
@ -16,8 +14,8 @@ func (ksNode *key2SegNode) Operate(in []*Msg) []*Msg {
} }
func newKey2SegNode() *key2SegNode { func newKey2SegNode() *key2SegNode {
maxQueueLength := flowgraph.Params.FlowGraphMaxQueueLength() maxQueueLength := Params.flowGraphMaxQueueLength()
maxParallelism := flowgraph.Params.FlowGraphMaxParallelism() maxParallelism := Params.flowGraphMaxParallelism()
baseNode := BaseNode{} baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxQueueLength(maxQueueLength)

View File

@ -9,10 +9,10 @@ import (
) )
func newDmInputNode(ctx context.Context) *flowgraph.InputNode { func newDmInputNode(ctx context.Context) *flowgraph.InputNode {
receiveBufSize := Params.dmMsgStreamReceiveBufSize() receiveBufSize := Params.dmReceiveBufSize()
pulsarBufSize := Params.dmPulsarBufSize() pulsarBufSize := Params.dmPulsarBufSize()
msgStreamURL, err := Params.PulsarAddress() msgStreamURL, err := Params.pulsarAddress()
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@ -27,6 +27,9 @@ func newDmInputNode(ctx context.Context) *flowgraph.InputNode {
var stream msgstream.MsgStream = insertStream var stream msgstream.MsgStream = insertStream
node := flowgraph.NewInputNode(&stream, "dmInputNode") maxQueueLength := Params.flowGraphMaxQueueLength()
maxParallelism := Params.flowGraphMaxParallelism()
node := flowgraph.NewInputNode(&stream, "dmInputNode", maxQueueLength, maxParallelism)
return node return node
} }

View File

@ -1,7 +1,5 @@
package reader package reader
import "github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
type schemaUpdateNode struct { type schemaUpdateNode struct {
BaseNode BaseNode
schemaUpdateMsg schemaUpdateMsg schemaUpdateMsg schemaUpdateMsg
@ -16,8 +14,8 @@ func (suNode *schemaUpdateNode) Operate(in []*Msg) []*Msg {
} }
func newSchemaUpdateNode() *schemaUpdateNode { func newSchemaUpdateNode() *schemaUpdateNode {
maxQueueLength := flowgraph.Params.FlowGraphMaxQueueLength() maxQueueLength := Params.flowGraphMaxQueueLength()
maxParallelism := flowgraph.Params.FlowGraphMaxParallelism() maxParallelism := Params.flowGraphMaxParallelism()
baseNode := BaseNode{} baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxQueueLength(maxQueueLength)

View File

@ -2,8 +2,6 @@ package reader
import ( import (
"log" "log"
"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
) )
type serviceTimeNode struct { type serviceTimeNode struct {
@ -35,8 +33,8 @@ func (stNode *serviceTimeNode) Operate(in []*Msg) []*Msg {
} }
func newServiceTimeNode(replica *collectionReplica) *serviceTimeNode { func newServiceTimeNode(replica *collectionReplica) *serviceTimeNode {
maxQueueLength := flowgraph.Params.FlowGraphMaxQueueLength() maxQueueLength := Params.flowGraphMaxQueueLength()
maxParallelism := flowgraph.Params.FlowGraphMaxParallelism() maxParallelism := Params.flowGraphMaxParallelism()
baseNode := BaseNode{} baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxQueueLength(maxQueueLength)

View File

@ -126,8 +126,8 @@ func isSegmentChannelRangeInQueryNodeChannelRange(segment *etcdpb.SegmentMeta) b
} }
Params.Init() Params.Init()
var queryNodeChannelStart = Params.TopicStart() var queryNodeChannelStart = Params.topicStart()
var queryNodeChannelEnd = Params.TopicEnd() var queryNodeChannelEnd = Params.topicEnd()
if segment.ChannelStart >= int32(queryNodeChannelStart) && segment.ChannelEnd <= int32(queryNodeChannelEnd) { if segment.ChannelStart >= int32(queryNodeChannelStart) && segment.ChannelEnd <= int32(queryNodeChannelEnd) {
return true return true

View File

@ -20,7 +20,7 @@ func (p *ParamTable) Init() {
} }
} }
func (p *ParamTable) PulsarAddress() (string, error) { func (p *ParamTable) pulsarAddress() (string, error) {
url, err := p.Load("_PulsarAddress") url, err := p.Load("_PulsarAddress")
if err != nil { if err != nil {
panic(err) panic(err)
@ -28,7 +28,7 @@ func (p *ParamTable) PulsarAddress() (string, error) {
return "pulsar://" + url, nil return "pulsar://" + url, nil
} }
func (p *ParamTable) QueryNodeID() int { func (p *ParamTable) queryNodeID() int {
queryNodeID, err := p.Load("reader.clientid") queryNodeID, err := p.Load("reader.clientid")
if err != nil { if err != nil {
panic(err) panic(err)
@ -40,7 +40,7 @@ func (p *ParamTable) QueryNodeID() int {
return id return id
} }
func (p *ParamTable) TopicStart() int { func (p *ParamTable) topicStart() int {
topicStart, err := p.Load("reader.topicstart") topicStart, err := p.Load("reader.topicstart")
if err != nil { if err != nil {
panic(err) panic(err)
@ -52,7 +52,7 @@ func (p *ParamTable) TopicStart() int {
return topicStartNum return topicStartNum
} }
func (p *ParamTable) TopicEnd() int { func (p *ParamTable) topicEnd() int {
topicEnd, err := p.Load("reader.topicend") topicEnd, err := p.Load("reader.topicend")
if err != nil { if err != nil {
panic(err) panic(err)
@ -64,9 +64,10 @@ func (p *ParamTable) TopicEnd() int {
return topicEndNum return topicEndNum
} }
// private advanced params // advanced params
func (p *ParamTable) statsServiceTimeInterval() int { // stats
timeInterval, err := p.Load("service.statsServiceTimeInterval") func (p *ParamTable) statsPublishInterval() int {
timeInterval, err := p.Load("reader.stats.publishInterval")
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -77,8 +78,34 @@ func (p *ParamTable) statsServiceTimeInterval() int {
return interval return interval
} }
func (p *ParamTable) statsMsgStreamReceiveBufSize() int64 { // dataSync:
revBufSize, err := p.Load("msgStream.receiveBufSize.statsMsgStream") func (p *ParamTable) flowGraphMaxQueueLength() int32 {
queueLength, err := p.Load("reader.dataSync.flowGraph.maxQueueLength")
if err != nil {
panic(err)
}
length, err := strconv.Atoi(queueLength)
if err != nil {
panic(err)
}
return int32(length)
}
func (p *ParamTable) flowGraphMaxParallelism() int32 {
maxParallelism, err := p.Load("reader.dataSync.flowGraph.maxParallelism")
if err != nil {
panic(err)
}
maxPara, err := strconv.Atoi(maxParallelism)
if err != nil {
panic(err)
}
return int32(maxPara)
}
// msgStream
func (p *ParamTable) dmReceiveBufSize() int64 {
revBufSize, err := p.Load("reader.msgStream.dm.recvBufSize")
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -89,32 +116,20 @@ func (p *ParamTable) statsMsgStreamReceiveBufSize() int64 {
return int64(bufSize) return int64(bufSize)
} }
func (p *ParamTable) dmMsgStreamReceiveBufSize() int64 { func (p *ParamTable) dmPulsarBufSize() int64 {
revBufSize, err := p.Load("msgStream.receiveBufSize.dmMsgStream") pulsarBufSize, err := p.Load("reader.msgStream.dm.pulsarBufSize")
if err != nil { if err != nil {
panic(err) panic(err)
} }
bufSize, err := strconv.Atoi(revBufSize) bufSize, err := strconv.Atoi(pulsarBufSize)
if err != nil { if err != nil {
panic(err) panic(err)
} }
return int64(bufSize) return int64(bufSize)
} }
func (p *ParamTable) searchMsgStreamReceiveBufSize() int64 { func (p *ParamTable) searchReceiveBufSize() int64 {
revBufSize, err := p.Load("msgStream.receiveBufSize.searchMsgStream") revBufSize, err := p.Load("reader.msgStream.search.recvBufSize")
if err != nil {
panic(err)
}
bufSize, err := strconv.Atoi(revBufSize)
if err != nil {
panic(err)
}
return int64(bufSize)
}
func (p *ParamTable) searchResultMsgStreamReceiveBufSize() int64 {
revBufSize, err := p.Load("msgStream.receiveBufSize.searchResultMsgStream")
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -126,7 +141,7 @@ func (p *ParamTable) searchResultMsgStreamReceiveBufSize() int64 {
} }
func (p *ParamTable) searchPulsarBufSize() int64 { func (p *ParamTable) searchPulsarBufSize() int64 {
pulsarBufSize, err := p.Load("msgStream.pulsarBufSize.search") pulsarBufSize, err := p.Load("reader.msgStream.search.pulsarBufSize")
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -137,12 +152,24 @@ func (p *ParamTable) searchPulsarBufSize() int64 {
return int64(bufSize) return int64(bufSize)
} }
func (p *ParamTable) dmPulsarBufSize() int64 { func (p *ParamTable) searchResultReceiveBufSize() int64 {
pulsarBufSize, err := p.Load("msgStream.pulsarBufSize.dm") revBufSize, err := p.Load("reader.msgStream.searchResult.recvBufSize")
if err != nil { if err != nil {
panic(err) panic(err)
} }
bufSize, err := strconv.Atoi(pulsarBufSize) bufSize, err := strconv.Atoi(revBufSize)
if err != nil {
panic(err)
}
return int64(bufSize)
}
func (p *ParamTable) statsReceiveBufSize() int64 {
revBufSize, err := p.Load("reader.msgStream.stats.recvBufSize")
if err != nil {
panic(err)
}
bufSize, err := strconv.Atoi(revBufSize)
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@ -12,56 +12,56 @@ func TestParamTable_Init(t *testing.T) {
func TestParamTable_PulsarAddress(t *testing.T) { func TestParamTable_PulsarAddress(t *testing.T) {
Params.Init() Params.Init()
address, err := Params.PulsarAddress() address, err := Params.pulsarAddress()
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, address, "pulsar://localhost:6650") assert.Equal(t, address, "pulsar://localhost:6650")
} }
func TestParamTable_QueryNodeID(t *testing.T) { func TestParamTable_QueryNodeID(t *testing.T) {
Params.Init() Params.Init()
id := Params.QueryNodeID() id := Params.queryNodeID()
assert.Equal(t, id, 0) assert.Equal(t, id, 0)
} }
func TestParamTable_TopicStart(t *testing.T) { func TestParamTable_TopicStart(t *testing.T) {
Params.Init() Params.Init()
topicStart := Params.TopicStart() topicStart := Params.topicStart()
assert.Equal(t, topicStart, 0) assert.Equal(t, topicStart, 0)
} }
func TestParamTable_TopicEnd(t *testing.T) { func TestParamTable_TopicEnd(t *testing.T) {
Params.Init() Params.Init()
topicEnd := Params.TopicEnd() topicEnd := Params.topicEnd()
assert.Equal(t, topicEnd, 128) assert.Equal(t, topicEnd, 128)
} }
func TestParamTable_statsServiceTimeInterval(t *testing.T) { func TestParamTable_statsServiceTimeInterval(t *testing.T) {
Params.Init() Params.Init()
interval := Params.statsServiceTimeInterval() interval := Params.statsPublishInterval()
assert.Equal(t, interval, 1000) assert.Equal(t, interval, 1000)
} }
func TestParamTable_statsMsgStreamReceiveBufSize(t *testing.T) { func TestParamTable_statsMsgStreamReceiveBufSize(t *testing.T) {
Params.Init() Params.Init()
bufSize := Params.statsMsgStreamReceiveBufSize() bufSize := Params.statsReceiveBufSize()
assert.Equal(t, bufSize, int64(64)) assert.Equal(t, bufSize, int64(64))
} }
func TestParamTable_dmMsgStreamReceiveBufSize(t *testing.T) { func TestParamTable_dmMsgStreamReceiveBufSize(t *testing.T) {
Params.Init() Params.Init()
bufSize := Params.dmMsgStreamReceiveBufSize() bufSize := Params.dmReceiveBufSize()
assert.Equal(t, bufSize, int64(1024)) assert.Equal(t, bufSize, int64(1024))
} }
func TestParamTable_searchMsgStreamReceiveBufSize(t *testing.T) { func TestParamTable_searchMsgStreamReceiveBufSize(t *testing.T) {
Params.Init() Params.Init()
bufSize := Params.searchMsgStreamReceiveBufSize() bufSize := Params.searchReceiveBufSize()
assert.Equal(t, bufSize, int64(512)) assert.Equal(t, bufSize, int64(512))
} }
func TestParamTable_searchResultMsgStreamReceiveBufSize(t *testing.T) { func TestParamTable_searchResultMsgStreamReceiveBufSize(t *testing.T) {
Params.Init() Params.Init()
bufSize := Params.searchResultMsgStreamReceiveBufSize() bufSize := Params.searchResultReceiveBufSize()
assert.Equal(t, bufSize, int64(64)) assert.Equal(t, bufSize, int64(64))
} }
@ -76,3 +76,15 @@ func TestParamTable_dmPulsarBufSize(t *testing.T) {
bufSize := Params.dmPulsarBufSize() bufSize := Params.dmPulsarBufSize()
assert.Equal(t, bufSize, int64(1024)) assert.Equal(t, bufSize, int64(1024))
} }
func TestParamTable_flowGraphMaxQueueLength(t *testing.T) {
Params.Init()
length := Params.flowGraphMaxQueueLength()
assert.Equal(t, length, int32(1024))
}
func TestParamTable_flowGraphMaxParallelism(t *testing.T) {
Params.Init()
maxParallelism := Params.flowGraphMaxParallelism()
assert.Equal(t, maxParallelism, int32(1024))
}

View File

@ -35,10 +35,10 @@ type SearchResult struct {
} }
func newSearchService(ctx context.Context, replica *collectionReplica) *searchService { func newSearchService(ctx context.Context, replica *collectionReplica) *searchService {
receiveBufSize := Params.searchMsgStreamReceiveBufSize() receiveBufSize := Params.searchReceiveBufSize()
pulsarBufSize := Params.searchPulsarBufSize() pulsarBufSize := Params.searchPulsarBufSize()
msgStreamURL, err := Params.PulsarAddress() msgStreamURL, err := Params.pulsarAddress()
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }

View File

@ -28,11 +28,11 @@ func newStatsService(ctx context.Context, replica *collectionReplica) *statsServ
} }
func (sService *statsService) start() { func (sService *statsService) start() {
sleepTimeInterval := Params.statsServiceTimeInterval() sleepTimeInterval := Params.statsPublishInterval()
receiveBufSize := Params.statsMsgStreamReceiveBufSize() receiveBufSize := Params.statsReceiveBufSize()
// start pulsar // start pulsar
msgStreamURL, err := Params.PulsarAddress() msgStreamURL, err := Params.pulsarAddress()
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }

View File

@ -45,10 +45,7 @@ func (inNode *InputNode) Operate(in []*Msg) []*Msg {
return []*Msg{&msgStreamMsg} return []*Msg{&msgStreamMsg}
} }
func NewInputNode(inStream *msgstream.MsgStream, nodeName string) *InputNode { func NewInputNode(inStream *msgstream.MsgStream, nodeName string, maxQueueLength int32, maxParallelism int32) *InputNode {
maxQueueLength := Params.FlowGraphMaxQueueLength()
maxParallelism := Params.FlowGraphMaxParallelism()
baseNode := BaseNode{} baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism) baseNode.SetMaxParallelism(maxParallelism)

View File

@ -1,45 +0,0 @@
package flowgraph
import (
"strconv"
"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
)
type ParamTable struct {
paramtable.BaseTable
}
var Params ParamTable
func (p *ParamTable) Init() {
p.BaseTable.Init()
err := p.LoadYaml("advanced/flow_graph.yaml")
if err != nil {
panic(err)
}
}
func (p *ParamTable) FlowGraphMaxQueueLength() int32 {
queueLength, err := p.Load("flowGraph.maxQueueLength")
if err != nil {
panic(err)
}
length, err := strconv.Atoi(queueLength)
if err != nil {
panic(err)
}
return int32(length)
}
func (p *ParamTable) FlowGraphMaxParallelism() int32 {
maxParallelism, err := p.Load("flowGraph.maxParallelism")
if err != nil {
panic(err)
}
maxPara, err := strconv.Atoi(maxParallelism)
if err != nil {
panic(err)
}
return int32(maxPara)
}

View File

@ -1,19 +0,0 @@
package flowgraph
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestParamTable_flowGraphMaxQueueLength(t *testing.T) {
Params.Init()
length := Params.FlowGraphMaxQueueLength()
assert.Equal(t, length, int32(1024))
}
func TestParamTable_flowGraphMaxParallelism(t *testing.T) {
Params.Init()
maxParallelism := Params.FlowGraphMaxParallelism()
assert.Equal(t, maxParallelism, int32(1024))
}

View File

@ -12,7 +12,7 @@
package paramtable package paramtable
import ( import (
"fmt" "log"
"path" "path"
"runtime" "runtime"
"strconv" "strconv"
@ -20,6 +20,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/kv" "github.com/zilliztech/milvus-distributed/internal/kv"
"github.com/spf13/cast"
"github.com/spf13/viper" "github.com/spf13/viper"
) )
@ -86,11 +87,34 @@ func (gp *BaseTable) LoadYaml(fileName string) error {
} }
for _, key := range config.AllKeys() { for _, key := range config.AllKeys() {
fmt.Println(key) val := config.Get(key)
err := gp.params.Save(strings.ToLower(key), config.GetString(key)) str, err := cast.ToStringE(val)
if err != nil {
switch val := val.(type) {
case []interface{}:
str = str[:0]
for _, v := range val {
ss, err := cast.ToStringE(v)
if err != nil {
log.Panic(err)
}
if len(str) == 0 {
str = ss
} else {
str = str + "," + ss
}
}
default:
log.Panicf("undefine config type, key=%s", key)
}
}
log.Printf("%s : %s", key, str)
err = gp.params.Save(strings.ToLower(key), str)
if err != nil { if err != nil {
panic(err) panic(err)
} }
} }
return nil return nil

View File

@ -48,10 +48,16 @@ if __name__ == "__main__":
node_names = ["Expr", "PlanNode"] node_names = ["Expr", "PlanNode"]
visitor_info = { visitor_info = {
'Expr': [{ 'Expr': [
{
'visitor_name': "ShowExprVisitor", 'visitor_name': "ShowExprVisitor",
"parameter_name": 'expr', "parameter_name": 'expr',
}], },
{
'visitor_name': "ExecExprVisitor",
"parameter_name": 'expr',
},
],
'PlanNode': [ 'PlanNode': [
{ {
'visitor_name': "ShowPlanNodeVisitor", 'visitor_name': "ShowPlanNodeVisitor",

View File

@ -2,7 +2,7 @@
@@root_base@@Visitor @@root_base@@Visitor
#### ####
@@@@body@struct_name @@@@body@struct_name
virtual void void
visit(@@struct_name@@& @@parameter_name@@) override; visit(@@struct_name@@& @@parameter_name@@) override;
#### ####
@@@@main @@@@main