mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 01:28:27 +08:00
Add framework of ExecExprVisitor
Signed-off-by: FluorineDog <guilin.gou@zilliz.com>
This commit is contained in:
parent
057673edc9
commit
e7dd30a884
14
configs/advanced/flow_graph.yaml
Normal file
14
configs/advanced/flow_graph.yaml
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
# Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||||
|
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||||
|
# or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
flowGraph:
|
||||||
|
maxQueueLength: 1024
|
||||||
|
maxParallelism: 1024
|
||||||
23
configs/advanced/reader.yaml
Normal file
23
configs/advanced/reader.yaml
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
# Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||||
|
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||||
|
# or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
service:
|
||||||
|
statsServiceTimeInterval: 1000 # milliseconds
|
||||||
|
|
||||||
|
msgStream:
|
||||||
|
receiveBufSize: # msgPack chan buffer size
|
||||||
|
statsMsgStream: 64
|
||||||
|
dmMsgStream: 1024
|
||||||
|
searchMsgStream: 512
|
||||||
|
searchResultMsgStream: 64
|
||||||
|
pulsarBufSize: # pulsar chan buffer size
|
||||||
|
search: 512
|
||||||
|
dm: 1024
|
||||||
@ -6,6 +6,7 @@ set(MILVUS_QUERY_SRCS
|
|||||||
visitors/ShowPlanNodeVisitor.cpp
|
visitors/ShowPlanNodeVisitor.cpp
|
||||||
visitors/ExecPlanNodeVisitor.cpp
|
visitors/ExecPlanNodeVisitor.cpp
|
||||||
visitors/ShowExprVisitor.cpp
|
visitors/ShowExprVisitor.cpp
|
||||||
|
visitors/ExecExprVisitor.cpp
|
||||||
Plan.cpp
|
Plan.cpp
|
||||||
)
|
)
|
||||||
add_library(milvus_query ${MILVUS_QUERY_SRCS})
|
add_library(milvus_query ${MILVUS_QUERY_SRCS})
|
||||||
|
|||||||
@ -22,18 +22,10 @@ using ExprPtr = std::unique_ptr<Expr>;
|
|||||||
struct BinaryExpr : Expr {
|
struct BinaryExpr : Expr {
|
||||||
ExprPtr left_;
|
ExprPtr left_;
|
||||||
ExprPtr right_;
|
ExprPtr right_;
|
||||||
|
|
||||||
public:
|
|
||||||
void
|
|
||||||
accept(ExprVisitor&) = 0;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
struct UnaryExpr : Expr {
|
struct UnaryExpr : Expr {
|
||||||
ExprPtr child_;
|
ExprPtr child_;
|
||||||
|
|
||||||
public:
|
|
||||||
void
|
|
||||||
accept(ExprVisitor&) = 0;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO: not enabled in sprint 1
|
// TODO: not enabled in sprint 1
|
||||||
@ -60,7 +52,7 @@ using FieldId = std::string;
|
|||||||
|
|
||||||
struct TermExpr : Expr {
|
struct TermExpr : Expr {
|
||||||
FieldId field_id_;
|
FieldId field_id_;
|
||||||
DataType data_type_;
|
DataType data_type_ = DataType::NONE;
|
||||||
// std::vector<std::any> terms_;
|
// std::vector<std::any> terms_;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
@ -74,7 +66,7 @@ struct TermExpr : Expr {
|
|||||||
|
|
||||||
struct RangeExpr : Expr {
|
struct RangeExpr : Expr {
|
||||||
FieldId field_id_;
|
FieldId field_id_;
|
||||||
DataType data_type_;
|
DataType data_type_ = DataType::NONE;
|
||||||
enum class OpType { LessThan, LessEqual, GreaterThan, GreaterEqual, Equal, NotEqual };
|
enum class OpType { LessThan, LessEqual, GreaterThan, GreaterEqual, Equal, NotEqual };
|
||||||
static const std::map<std::string, OpType> mapping_; // op_name -> op
|
static const std::map<std::string, OpType> mapping_; // op_name -> op
|
||||||
|
|
||||||
|
|||||||
@ -52,8 +52,10 @@ ParseRangeNodeImpl(const Schema& schema, const std::string& field_name, const Js
|
|||||||
expr->data_type_ = data_type;
|
expr->data_type_ = data_type;
|
||||||
expr->field_id_ = field_name;
|
expr->field_id_ = field_name;
|
||||||
for (auto& item : body.items()) {
|
for (auto& item : body.items()) {
|
||||||
auto& op_name = item.key();
|
auto op_name = to_lower(item.key());
|
||||||
auto op = RangeExpr::mapping_.at(to_lower(op_name));
|
|
||||||
|
AssertInfo(RangeExpr::mapping_.count(op_name), "op(" + op_name + ") not found");
|
||||||
|
auto op = RangeExpr::mapping_.at(op_name);
|
||||||
T value = item.value();
|
T value = item.value();
|
||||||
expr->conditions_.emplace_back(op, value);
|
expr->conditions_.emplace_back(op, value);
|
||||||
}
|
}
|
||||||
@ -130,7 +132,6 @@ CreatePlan(const Schema& schema, const std::string& dsl_str) {
|
|||||||
|
|
||||||
std::unique_ptr<PlaceholderGroup>
|
std::unique_ptr<PlaceholderGroup>
|
||||||
ParsePlaceholderGroup(const Plan* plan, const std::string& blob) {
|
ParsePlaceholderGroup(const Plan* plan, const std::string& blob) {
|
||||||
(void)plan;
|
|
||||||
namespace ser = milvus::proto::service;
|
namespace ser = milvus::proto::service;
|
||||||
auto result = std::make_unique<PlaceholderGroup>();
|
auto result = std::make_unique<PlaceholderGroup>();
|
||||||
ser::PlaceholderGroup ph_group;
|
ser::PlaceholderGroup ph_group;
|
||||||
@ -139,9 +140,14 @@ ParsePlaceholderGroup(const Plan* plan, const std::string& blob) {
|
|||||||
for (auto& info : ph_group.placeholders()) {
|
for (auto& info : ph_group.placeholders()) {
|
||||||
Placeholder element;
|
Placeholder element;
|
||||||
element.tag_ = info.tag();
|
element.tag_ = info.tag();
|
||||||
|
Assert(plan->tag2field_.count(element.tag_));
|
||||||
|
auto field_id = plan->tag2field_.at(element.tag_);
|
||||||
|
auto& field_meta = plan->schema_[field_id];
|
||||||
element.num_of_queries_ = info.values_size();
|
element.num_of_queries_ = info.values_size();
|
||||||
AssertInfo(element.num_of_queries_, "must have queries");
|
AssertInfo(element.num_of_queries_, "must have queries");
|
||||||
|
Assert(element.num_of_queries_ > 0);
|
||||||
element.line_sizeof_ = info.values().Get(0).size();
|
element.line_sizeof_ = info.values().Get(0).size();
|
||||||
|
Assert(field_meta.get_sizeof() == element.line_sizeof_);
|
||||||
auto& target = element.blob_;
|
auto& target = element.blob_;
|
||||||
target.reserve(element.line_sizeof_ * element.num_of_queries_);
|
target.reserve(element.line_sizeof_ * element.num_of_queries_);
|
||||||
for (auto& line : info.values()) {
|
for (auto& line : info.values()) {
|
||||||
|
|||||||
@ -14,7 +14,7 @@ using Json = nlohmann::json;
|
|||||||
// class definitions
|
// class definitions
|
||||||
struct Plan {
|
struct Plan {
|
||||||
public:
|
public:
|
||||||
Plan(const Schema& schema) : schema_(schema) {
|
explicit Plan(const Schema& schema) : schema_(schema) {
|
||||||
}
|
}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
|||||||
@ -38,10 +38,6 @@ struct VectorPlanNode : PlanNode {
|
|||||||
std::optional<ExprPtr> predicate_;
|
std::optional<ExprPtr> predicate_;
|
||||||
QueryInfo query_info_;
|
QueryInfo query_info_;
|
||||||
std::string placeholder_tag_;
|
std::string placeholder_tag_;
|
||||||
|
|
||||||
public:
|
|
||||||
virtual void
|
|
||||||
accept(PlanNodeVisitor&) = 0;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
struct FloatVectorANNS : VectorPlanNode {
|
struct FloatVectorANNS : VectorPlanNode {
|
||||||
|
|||||||
25
internal/core/src/query/generated/ExecExprVisitor.cpp
Normal file
25
internal/core/src/query/generated/ExecExprVisitor.cpp
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
#error TODO: copy this file out, and modify the content.
|
||||||
|
#include "query/generated/ExecExprVisitor.h"
|
||||||
|
|
||||||
|
namespace milvus::query {
|
||||||
|
void
|
||||||
|
ExecExprVisitor::visit(BoolUnaryExpr& expr) {
|
||||||
|
// TODO
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
ExecExprVisitor::visit(BoolBinaryExpr& expr) {
|
||||||
|
// TODO
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
ExecExprVisitor::visit(TermExpr& expr) {
|
||||||
|
// TODO
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
ExecExprVisitor::visit(RangeExpr& expr) {
|
||||||
|
// TODO
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace milvus::query
|
||||||
41
internal/core/src/query/generated/ExecExprVisitor.h
Normal file
41
internal/core/src/query/generated/ExecExprVisitor.h
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
#pragma once
|
||||||
|
// Generated File
|
||||||
|
// DO NOT EDIT
|
||||||
|
#include "segcore/SegmentNaive.h"
|
||||||
|
#include <optional>
|
||||||
|
#include "ExprVisitor.h"
|
||||||
|
|
||||||
|
namespace milvus::query {
|
||||||
|
class ExecExprVisitor : ExprVisitor {
|
||||||
|
public:
|
||||||
|
void
|
||||||
|
visit(BoolUnaryExpr& expr) override;
|
||||||
|
|
||||||
|
void
|
||||||
|
visit(BoolBinaryExpr& expr) override;
|
||||||
|
|
||||||
|
void
|
||||||
|
visit(TermExpr& expr) override;
|
||||||
|
|
||||||
|
void
|
||||||
|
visit(RangeExpr& expr) override;
|
||||||
|
|
||||||
|
public:
|
||||||
|
using RetType = faiss::ConcurrentBitsetPtr;
|
||||||
|
explicit ExecExprVisitor(segcore::SegmentNaive& segment) : segment_(segment) {
|
||||||
|
}
|
||||||
|
RetType
|
||||||
|
call_child(Expr& expr) {
|
||||||
|
Assert(!ret_.has_value());
|
||||||
|
expr.accept(*this);
|
||||||
|
Assert(ret_.has_value());
|
||||||
|
auto ret = std::move(ret_);
|
||||||
|
ret_ = std::nullopt;
|
||||||
|
return std::move(ret.value());
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
segcore::SegmentNaive& segment_;
|
||||||
|
std::optional<RetType> ret_;
|
||||||
|
};
|
||||||
|
} // namespace milvus::query
|
||||||
@ -9,10 +9,10 @@
|
|||||||
namespace milvus::query {
|
namespace milvus::query {
|
||||||
class ExecPlanNodeVisitor : PlanNodeVisitor {
|
class ExecPlanNodeVisitor : PlanNodeVisitor {
|
||||||
public:
|
public:
|
||||||
virtual void
|
void
|
||||||
visit(FloatVectorANNS& node) override;
|
visit(FloatVectorANNS& node) override;
|
||||||
|
|
||||||
virtual void
|
void
|
||||||
visit(BinaryVectorANNS& node) override;
|
visit(BinaryVectorANNS& node) override;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
|||||||
@ -9,16 +9,16 @@
|
|||||||
namespace milvus::query {
|
namespace milvus::query {
|
||||||
class ShowExprVisitor : ExprVisitor {
|
class ShowExprVisitor : ExprVisitor {
|
||||||
public:
|
public:
|
||||||
virtual void
|
void
|
||||||
visit(BoolUnaryExpr& expr) override;
|
visit(BoolUnaryExpr& expr) override;
|
||||||
|
|
||||||
virtual void
|
void
|
||||||
visit(BoolBinaryExpr& expr) override;
|
visit(BoolBinaryExpr& expr) override;
|
||||||
|
|
||||||
virtual void
|
void
|
||||||
visit(TermExpr& expr) override;
|
visit(TermExpr& expr) override;
|
||||||
|
|
||||||
virtual void
|
void
|
||||||
visit(RangeExpr& expr) override;
|
visit(RangeExpr& expr) override;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
|||||||
@ -10,10 +10,10 @@
|
|||||||
namespace milvus::query {
|
namespace milvus::query {
|
||||||
class ShowPlanNodeVisitor : PlanNodeVisitor {
|
class ShowPlanNodeVisitor : PlanNodeVisitor {
|
||||||
public:
|
public:
|
||||||
virtual void
|
void
|
||||||
visit(FloatVectorANNS& node) override;
|
visit(FloatVectorANNS& node) override;
|
||||||
|
|
||||||
virtual void
|
void
|
||||||
visit(BinaryVectorANNS& node) override;
|
visit(BinaryVectorANNS& node) override;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
|||||||
51
internal/core/src/query/visitors/ExecExprVisitor.cpp
Normal file
51
internal/core/src/query/visitors/ExecExprVisitor.cpp
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
#include "segcore/SegmentNaive.h"
|
||||||
|
#include <optional>
|
||||||
|
#include "query/generated/ExecExprVisitor.h"
|
||||||
|
|
||||||
|
namespace milvus::query {
|
||||||
|
#if 1
|
||||||
|
// THIS CONTAINS EXTRA BODY FOR VISITOR
|
||||||
|
// WILL BE USED BY GENERATOR
|
||||||
|
namespace impl {
|
||||||
|
class ExecExprVisitor : ExprVisitor {
|
||||||
|
public:
|
||||||
|
using RetType = faiss::ConcurrentBitsetPtr;
|
||||||
|
explicit ExecExprVisitor(segcore::SegmentNaive& segment) : segment_(segment) {
|
||||||
|
}
|
||||||
|
RetType
|
||||||
|
call_child(Expr& expr) {
|
||||||
|
Assert(!ret_.has_value());
|
||||||
|
expr.accept(*this);
|
||||||
|
Assert(ret_.has_value());
|
||||||
|
auto ret = std::move(ret_);
|
||||||
|
ret_ = std::nullopt;
|
||||||
|
return std::move(ret.value());
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
segcore::SegmentNaive& segment_;
|
||||||
|
std::optional<RetType> ret_;
|
||||||
|
};
|
||||||
|
} // namespace impl
|
||||||
|
#endif
|
||||||
|
|
||||||
|
void
|
||||||
|
ExecExprVisitor::visit(BoolUnaryExpr& expr) {
|
||||||
|
PanicInfo("unimplemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
ExecExprVisitor::visit(BoolBinaryExpr& expr) {
|
||||||
|
PanicInfo("unimplemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
ExecExprVisitor::visit(TermExpr& expr) {
|
||||||
|
PanicInfo("unimplemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
ExecExprVisitor::visit(RangeExpr& expr) {
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace milvus::query
|
||||||
@ -3,7 +3,6 @@ set(SEGCORE_FILES
|
|||||||
SegmentNaive.cpp
|
SegmentNaive.cpp
|
||||||
SegmentSmallIndex.cpp
|
SegmentSmallIndex.cpp
|
||||||
IndexMeta.cpp
|
IndexMeta.cpp
|
||||||
ConcurrentVector.cpp
|
|
||||||
Collection.cpp
|
Collection.cpp
|
||||||
collection_c.cpp
|
collection_c.cpp
|
||||||
segment_c.cpp
|
segment_c.cpp
|
||||||
|
|||||||
@ -4,7 +4,6 @@
|
|||||||
#include "pb/etcd_meta.pb.h"
|
#include "pb/etcd_meta.pb.h"
|
||||||
#include <google/protobuf/text_format.h>
|
#include <google/protobuf/text_format.h>
|
||||||
#include <knowhere/index/vector_index/adapter/VectorAdapter.h>
|
#include <knowhere/index/vector_index/adapter/VectorAdapter.h>
|
||||||
#include <cstring>
|
|
||||||
|
|
||||||
namespace milvus::segcore {
|
namespace milvus::segcore {
|
||||||
|
|
||||||
@ -134,7 +133,7 @@ Collection::parse() {
|
|||||||
auto schema = std::make_shared<Schema>();
|
auto schema = std::make_shared<Schema>();
|
||||||
for (const milvus::proto::schema::FieldSchema& child : collection_meta.schema().fields()) {
|
for (const milvus::proto::schema::FieldSchema& child : collection_meta.schema().fields()) {
|
||||||
const auto& type_params = child.type_params();
|
const auto& type_params = child.type_params();
|
||||||
int dim = 16;
|
int64_t dim = 16;
|
||||||
for (const auto& type_param : type_params) {
|
for (const auto& type_param : type_params) {
|
||||||
if (type_param.key() == "dim") {
|
if (type_param.key() == "dim") {
|
||||||
dim = strtoll(type_param.value().c_str(), nullptr, 10);
|
dim = strtoll(type_param.value().c_str(), nullptr, 10);
|
||||||
|
|||||||
@ -1,5 +0,0 @@
|
|||||||
|
|
||||||
#include <iostream>
|
|
||||||
#include "segcore/ConcurrentVector.h"
|
|
||||||
|
|
||||||
namespace milvus::segcore {}
|
|
||||||
@ -11,7 +11,8 @@ IndexingEntry::BuildIndexRange(int64_t ack_beg, int64_t ack_end, const VectorBas
|
|||||||
assert(field_meta_.get_data_type() == DataType::VECTOR_FLOAT);
|
assert(field_meta_.get_data_type() == DataType::VECTOR_FLOAT);
|
||||||
auto dim = field_meta_.get_dim();
|
auto dim = field_meta_.get_dim();
|
||||||
|
|
||||||
auto source = static_cast<const ConcurrentVector<float>*>(vec_base);
|
auto source = dynamic_cast<const ConcurrentVector<float>*>(vec_base);
|
||||||
|
Assert(source);
|
||||||
auto chunk_size = source->chunk_size();
|
auto chunk_size = source->chunk_size();
|
||||||
assert(ack_end <= chunk_size);
|
assert(ack_end <= chunk_size);
|
||||||
auto conf = get_build_conf();
|
auto conf = get_build_conf();
|
||||||
|
|||||||
@ -451,10 +451,10 @@ SegmentNaive::QueryDeprecated(query::QueryDeprecatedPtr query_info, Timestamp ti
|
|||||||
Status
|
Status
|
||||||
SegmentNaive::Close() {
|
SegmentNaive::Close() {
|
||||||
if (this->record_.reserved != this->record_.ack_responder_.GetAck()) {
|
if (this->record_.reserved != this->record_.ack_responder_.GetAck()) {
|
||||||
std::runtime_error("insert not ready");
|
PanicInfo("insert not ready");
|
||||||
}
|
}
|
||||||
if (this->deleted_record_.reserved != this->record_.ack_responder_.GetAck()) {
|
if (this->deleted_record_.reserved != this->deleted_record_.ack_responder_.GetAck()) {
|
||||||
std::runtime_error("delete not ready");
|
PanicInfo("delete not ready");
|
||||||
}
|
}
|
||||||
state_ = SegmentState::Closed;
|
state_ = SegmentState::Closed;
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
|
|||||||
@ -20,8 +20,6 @@
|
|||||||
namespace milvus::segcore {
|
namespace milvus::segcore {
|
||||||
class SegmentNaive : public SegmentBase {
|
class SegmentNaive : public SegmentBase {
|
||||||
public:
|
public:
|
||||||
virtual ~SegmentNaive() = default;
|
|
||||||
|
|
||||||
// SegmentBase(std::shared_ptr<FieldsInfo> collection);
|
// SegmentBase(std::shared_ptr<FieldsInfo> collection);
|
||||||
|
|
||||||
int64_t
|
int64_t
|
||||||
@ -47,7 +45,7 @@ class SegmentNaive : public SegmentBase {
|
|||||||
Status
|
Status
|
||||||
QueryDeprecated(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& results) override;
|
QueryDeprecated(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& results) override;
|
||||||
|
|
||||||
virtual Status
|
Status
|
||||||
Search(const query::Plan* Plan,
|
Search(const query::Plan* Plan,
|
||||||
const query::PlaceholderGroup* placeholder_groups[],
|
const query::PlaceholderGroup* placeholder_groups[],
|
||||||
const Timestamp timestamps[],
|
const Timestamp timestamps[],
|
||||||
@ -106,7 +104,7 @@ class SegmentNaive : public SegmentBase {
|
|||||||
friend std::unique_ptr<SegmentBase>
|
friend std::unique_ptr<SegmentBase>
|
||||||
CreateSegment(SchemaPtr schema);
|
CreateSegment(SchemaPtr schema);
|
||||||
|
|
||||||
explicit SegmentNaive(SchemaPtr schema) : schema_(schema), record_(*schema) {
|
explicit SegmentNaive(const SchemaPtr& schema) : schema_(schema), record_(*schema) {
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|||||||
@ -50,7 +50,7 @@ SegmentSmallIndex::get_deleted_bitmap(int64_t del_barrier,
|
|||||||
for (auto del_index = del_barrier; del_index < old->del_barrier; ++del_index) {
|
for (auto del_index = del_barrier; del_index < old->del_barrier; ++del_index) {
|
||||||
// get uid in delete logs
|
// get uid in delete logs
|
||||||
auto uid = deleted_record_.uids_[del_index];
|
auto uid = deleted_record_.uids_[del_index];
|
||||||
// map uid to corrensponding offsets, select the max one, which should be the target
|
// map uid to corresponding offsets, select the max one, which should be the target
|
||||||
// the max one should be closest to query_timestamp, so the delete log should refer to it
|
// the max one should be closest to query_timestamp, so the delete log should refer to it
|
||||||
int64_t the_offset = -1;
|
int64_t the_offset = -1;
|
||||||
auto [iter_b, iter_e] = uid2offset_.equal_range(uid);
|
auto [iter_b, iter_e] = uid2offset_.equal_range(uid);
|
||||||
@ -73,7 +73,7 @@ SegmentSmallIndex::get_deleted_bitmap(int64_t del_barrier,
|
|||||||
for (auto del_index = old->del_barrier; del_index < del_barrier; ++del_index) {
|
for (auto del_index = old->del_barrier; del_index < del_barrier; ++del_index) {
|
||||||
// get uid in delete logs
|
// get uid in delete logs
|
||||||
auto uid = deleted_record_.uids_[del_index];
|
auto uid = deleted_record_.uids_[del_index];
|
||||||
// map uid to corrensponding offsets, select the max one, which should be the target
|
// map uid to corresponding offsets, select the max one, which should be the target
|
||||||
// the max one should be closest to query_timestamp, so the delete log should refer to it
|
// the max one should be closest to query_timestamp, so the delete log should refer to it
|
||||||
int64_t the_offset = -1;
|
int64_t the_offset = -1;
|
||||||
auto [iter_b, iter_e] = uid2offset_.equal_range(uid);
|
auto [iter_b, iter_e] = uid2offset_.equal_range(uid);
|
||||||
@ -228,7 +228,7 @@ SegmentSmallIndex::QueryBruteForceImpl(const query::QueryInfo& info,
|
|||||||
QueryResult& results) {
|
QueryResult& results) {
|
||||||
// step 1: binary search to find the barrier of the snapshot
|
// step 1: binary search to find the barrier of the snapshot
|
||||||
auto ins_barrier = get_barrier(record_, timestamp);
|
auto ins_barrier = get_barrier(record_, timestamp);
|
||||||
auto del_barrier = get_barrier(deleted_record_, timestamp);
|
// auto del_barrier = get_barrier(deleted_record_, timestamp);
|
||||||
#if 0
|
#if 0
|
||||||
auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier);
|
auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier);
|
||||||
Assert(bitmap_holder);
|
Assert(bitmap_holder);
|
||||||
@ -321,7 +321,6 @@ SegmentSmallIndex::QueryDeprecated(query::QueryDeprecatedPtr query_info, Timesta
|
|||||||
x = dis(e);
|
x = dis(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
int64_t inferred_dim = query_info->query_raw_data.size() / query_info->num_queries;
|
|
||||||
// TODO
|
// TODO
|
||||||
query::QueryInfo info{
|
query::QueryInfo info{
|
||||||
query_info->topK,
|
query_info->topK,
|
||||||
@ -338,10 +337,10 @@ SegmentSmallIndex::QueryDeprecated(query::QueryDeprecatedPtr query_info, Timesta
|
|||||||
Status
|
Status
|
||||||
SegmentSmallIndex::Close() {
|
SegmentSmallIndex::Close() {
|
||||||
if (this->record_.reserved != this->record_.ack_responder_.GetAck()) {
|
if (this->record_.reserved != this->record_.ack_responder_.GetAck()) {
|
||||||
std::runtime_error("insert not ready");
|
PanicInfo("insert not ready");
|
||||||
}
|
}
|
||||||
if (this->deleted_record_.reserved != this->record_.ack_responder_.GetAck()) {
|
if (this->deleted_record_.reserved != this->deleted_record_.ack_responder_.GetAck()) {
|
||||||
std::runtime_error("delete not ready");
|
PanicInfo("delete not ready");
|
||||||
}
|
}
|
||||||
state_ = SegmentState::Closed;
|
state_ = SegmentState::Closed;
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
@ -357,8 +356,6 @@ SegmentSmallIndex::BuildVecIndexImpl(const IndexMeta::Entry& entry) {
|
|||||||
auto dim = field.get_dim();
|
auto dim = field.get_dim();
|
||||||
|
|
||||||
auto indexing = knowhere::VecIndexFactory::GetInstance().CreateVecIndex(entry.type, entry.mode);
|
auto indexing = knowhere::VecIndexFactory::GetInstance().CreateVecIndex(entry.type, entry.mode);
|
||||||
auto chunk_size = record_.uids_.chunk_size();
|
|
||||||
|
|
||||||
auto& uids = record_.uids_;
|
auto& uids = record_.uids_;
|
||||||
auto entities = record_.get_vec_entity<float>(offset);
|
auto entities = record_.get_vec_entity<float>(offset);
|
||||||
|
|
||||||
@ -398,7 +395,7 @@ SegmentSmallIndex::BuildIndex(IndexMetaPtr remote_index_meta) {
|
|||||||
|
|
||||||
auto index_meta = std::make_shared<IndexMeta>(schema_);
|
auto index_meta = std::make_shared<IndexMeta>(schema_);
|
||||||
// TODO: this is merge of query conf and insert conf
|
// TODO: this is merge of query conf and insert conf
|
||||||
// TODO: should be splitted into multiple configs
|
// TODO: should be split into multiple configs
|
||||||
auto conf = milvus::knowhere::Config{
|
auto conf = milvus::knowhere::Config{
|
||||||
{milvus::knowhere::meta::DIM, dim}, {milvus::knowhere::IndexParams::nlist, 100},
|
{milvus::knowhere::meta::DIM, dim}, {milvus::knowhere::IndexParams::nlist, 100},
|
||||||
{milvus::knowhere::IndexParams::nprobe, 4}, {milvus::knowhere::IndexParams::m, 4},
|
{milvus::knowhere::IndexParams::nprobe, 4}, {milvus::knowhere::IndexParams::m, 4},
|
||||||
@ -431,8 +428,16 @@ SegmentSmallIndex::BuildIndex(IndexMetaPtr remote_index_meta) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
index_ready_ = true;
|
index_ready_ = true;
|
||||||
#endif
|
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
static uint64_t
|
||||||
|
upper_align(int64_t value, int64_t align) {
|
||||||
|
Assert(align > 0);
|
||||||
|
Assert((align & (align - 1)) == 0);
|
||||||
|
auto groups = (value + align - 1) / align;
|
||||||
|
return groups * align;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t
|
int64_t
|
||||||
@ -448,9 +453,9 @@ SegmentSmallIndex::GetMemoryUsageInBytes() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
int64_t ins_n = (record_.reserved + DefaultElementPerChunk - 1) & ~(DefaultElementPerChunk - 1);
|
int64_t ins_n = upper_align(record_.reserved, DefaultElementPerChunk);
|
||||||
total_bytes += ins_n * (schema_->get_total_sizeof() + 16 + 1);
|
total_bytes += ins_n * (schema_->get_total_sizeof() + 16 + 1);
|
||||||
int64_t del_n = (deleted_record_.reserved + DefaultElementPerChunk - 1) & ~(DefaultElementPerChunk - 1);
|
int64_t del_n = upper_align(deleted_record_.reserved, DefaultElementPerChunk);
|
||||||
total_bytes += del_n * (16 * 2);
|
total_bytes += del_n * (16 * 2);
|
||||||
return total_bytes;
|
return total_bytes;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -46,10 +46,6 @@ namespace milvus::segcore {
|
|||||||
|
|
||||||
class SegmentSmallIndex : public SegmentBase {
|
class SegmentSmallIndex : public SegmentBase {
|
||||||
public:
|
public:
|
||||||
virtual ~SegmentSmallIndex() = default;
|
|
||||||
|
|
||||||
// SegmentBase(std::shared_ptr<FieldsInfo> collection);
|
|
||||||
|
|
||||||
int64_t
|
int64_t
|
||||||
PreInsert(int64_t size) override;
|
PreInsert(int64_t size) override;
|
||||||
|
|
||||||
@ -111,6 +107,9 @@ class SegmentSmallIndex : public SegmentBase {
|
|||||||
GetMemoryUsageInBytes() override;
|
GetMemoryUsageInBytes() override;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
void
|
||||||
|
get_insert_record();
|
||||||
|
|
||||||
ssize_t
|
ssize_t
|
||||||
get_row_count() const override {
|
get_row_count() const override {
|
||||||
return record_.ack_responder_.GetAck();
|
return record_.ack_responder_.GetAck();
|
||||||
@ -130,7 +129,8 @@ class SegmentSmallIndex : public SegmentBase {
|
|||||||
friend std::unique_ptr<SegmentBase>
|
friend std::unique_ptr<SegmentBase>
|
||||||
CreateSegment(SchemaPtr schema);
|
CreateSegment(SchemaPtr schema);
|
||||||
|
|
||||||
explicit SegmentSmallIndex(SchemaPtr schema) : schema_(schema), record_(*schema_), indexing_record_(*schema_) {
|
explicit SegmentSmallIndex(SchemaPtr schema)
|
||||||
|
: schema_(std::move(schema)), record_(*schema_), indexing_record_(*schema_) {
|
||||||
}
|
}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
|||||||
@ -141,7 +141,7 @@ TEST(CApiTest, SearchTest) {
|
|||||||
auto blob = raw_group.SerializeAsString();
|
auto blob = raw_group.SerializeAsString();
|
||||||
|
|
||||||
auto plan = CreatePlan(collection, dsl_string);
|
auto plan = CreatePlan(collection, dsl_string);
|
||||||
auto placeholderGroup = ParsePlaceholderGroup(nullptr, blob.data(), blob.length());
|
auto placeholderGroup = ParsePlaceholderGroup(plan, blob.data(), blob.length());
|
||||||
std::vector<CPlaceholderGroup> placeholderGroups;
|
std::vector<CPlaceholderGroup> placeholderGroups;
|
||||||
placeholderGroups.push_back(placeholderGroup);
|
placeholderGroups.push_back(placeholderGroup);
|
||||||
timestamps.clear();
|
timestamps.clear();
|
||||||
@ -228,7 +228,7 @@ TEST(CApiTest, BuildIndexTest) {
|
|||||||
auto blob = raw_group.SerializeAsString();
|
auto blob = raw_group.SerializeAsString();
|
||||||
|
|
||||||
auto plan = CreatePlan(collection, dsl_string);
|
auto plan = CreatePlan(collection, dsl_string);
|
||||||
auto placeholderGroup = ParsePlaceholderGroup(nullptr, blob.data(), blob.length());
|
auto placeholderGroup = ParsePlaceholderGroup(plan, blob.data(), blob.length());
|
||||||
std::vector<CPlaceholderGroup> placeholderGroups;
|
std::vector<CPlaceholderGroup> placeholderGroups;
|
||||||
placeholderGroups.push_back(placeholderGroup);
|
placeholderGroups.push_back(placeholderGroup);
|
||||||
timestamps.clear();
|
timestamps.clear();
|
||||||
|
|||||||
@ -133,7 +133,7 @@ TEST(Query, ParsePlaceholderGroup) {
|
|||||||
{
|
{
|
||||||
"bool": {
|
"bool": {
|
||||||
"vector": {
|
"vector": {
|
||||||
"Vec": {
|
"fakevec": {
|
||||||
"metric_type": "L2",
|
"metric_type": "L2",
|
||||||
"params": {
|
"params": {
|
||||||
"nprobe": 10
|
"nprobe": 10
|
||||||
|
|||||||
@ -37,6 +37,7 @@ func (dsService *dataSyncService) initNodes() {
|
|||||||
// TODO: add delete pipeline support
|
// TODO: add delete pipeline support
|
||||||
|
|
||||||
dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx)
|
dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx)
|
||||||
|
flowgraph.Params.Init()
|
||||||
|
|
||||||
var dmStreamNode Node = newDmInputNode(dsService.ctx)
|
var dmStreamNode Node = newDmInputNode(dsService.ctx)
|
||||||
var filterDmNode Node = newFilteredDmNode()
|
var filterDmNode Node = newFilteredDmNode()
|
||||||
|
|||||||
@ -1,5 +1,7 @@
|
|||||||
package reader
|
package reader
|
||||||
|
|
||||||
|
import "github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
|
||||||
|
|
||||||
type deleteNode struct {
|
type deleteNode struct {
|
||||||
BaseNode
|
BaseNode
|
||||||
deleteMsg deleteMsg
|
deleteMsg deleteMsg
|
||||||
@ -14,6 +16,9 @@ func (dNode *deleteNode) Operate(in []*Msg) []*Msg {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newDeleteNode() *deleteNode {
|
func newDeleteNode() *deleteNode {
|
||||||
|
maxQueueLength := flowgraph.Params.FlowGraphMaxQueueLength()
|
||||||
|
maxParallelism := flowgraph.Params.FlowGraphMaxParallelism()
|
||||||
|
|
||||||
baseNode := BaseNode{}
|
baseNode := BaseNode{}
|
||||||
baseNode.SetMaxQueueLength(maxQueueLength)
|
baseNode.SetMaxQueueLength(maxQueueLength)
|
||||||
baseNode.SetMaxParallelism(maxParallelism)
|
baseNode.SetMaxParallelism(maxParallelism)
|
||||||
|
|||||||
@ -5,6 +5,7 @@ import (
|
|||||||
|
|
||||||
"github.com/zilliztech/milvus-distributed/internal/msgstream"
|
"github.com/zilliztech/milvus-distributed/internal/msgstream"
|
||||||
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
|
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
|
||||||
|
"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
|
||||||
)
|
)
|
||||||
|
|
||||||
type filterDmNode struct {
|
type filterDmNode struct {
|
||||||
@ -54,6 +55,9 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newFilteredDmNode() *filterDmNode {
|
func newFilteredDmNode() *filterDmNode {
|
||||||
|
maxQueueLength := flowgraph.Params.FlowGraphMaxQueueLength()
|
||||||
|
maxParallelism := flowgraph.Params.FlowGraphMaxParallelism()
|
||||||
|
|
||||||
baseNode := BaseNode{}
|
baseNode := BaseNode{}
|
||||||
baseNode.SetMaxQueueLength(maxQueueLength)
|
baseNode.SetMaxQueueLength(maxQueueLength)
|
||||||
baseNode.SetMaxParallelism(maxParallelism)
|
baseNode.SetMaxParallelism(maxParallelism)
|
||||||
|
|||||||
@ -6,6 +6,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||||
|
"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
|
||||||
)
|
)
|
||||||
|
|
||||||
type insertNode struct {
|
type insertNode struct {
|
||||||
@ -126,6 +127,9 @@ func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *syn
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newInsertNode(replica *collectionReplica) *insertNode {
|
func newInsertNode(replica *collectionReplica) *insertNode {
|
||||||
|
maxQueueLength := flowgraph.Params.FlowGraphMaxQueueLength()
|
||||||
|
maxParallelism := flowgraph.Params.FlowGraphMaxParallelism()
|
||||||
|
|
||||||
baseNode := BaseNode{}
|
baseNode := BaseNode{}
|
||||||
baseNode.SetMaxQueueLength(maxQueueLength)
|
baseNode.SetMaxQueueLength(maxQueueLength)
|
||||||
baseNode.SetMaxParallelism(maxParallelism)
|
baseNode.SetMaxParallelism(maxParallelism)
|
||||||
|
|||||||
@ -1,5 +1,7 @@
|
|||||||
package reader
|
package reader
|
||||||
|
|
||||||
|
import "github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
|
||||||
|
|
||||||
type key2SegNode struct {
|
type key2SegNode struct {
|
||||||
BaseNode
|
BaseNode
|
||||||
key2SegMsg key2SegMsg
|
key2SegMsg key2SegMsg
|
||||||
@ -14,6 +16,9 @@ func (ksNode *key2SegNode) Operate(in []*Msg) []*Msg {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newKey2SegNode() *key2SegNode {
|
func newKey2SegNode() *key2SegNode {
|
||||||
|
maxQueueLength := flowgraph.Params.FlowGraphMaxQueueLength()
|
||||||
|
maxParallelism := flowgraph.Params.FlowGraphMaxParallelism()
|
||||||
|
|
||||||
baseNode := BaseNode{}
|
baseNode := BaseNode{}
|
||||||
baseNode.SetMaxQueueLength(maxQueueLength)
|
baseNode.SetMaxQueueLength(maxQueueLength)
|
||||||
baseNode.SetMaxParallelism(maxParallelism)
|
baseNode.SetMaxParallelism(maxParallelism)
|
||||||
|
|||||||
@ -9,10 +9,8 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func newDmInputNode(ctx context.Context) *flowgraph.InputNode {
|
func newDmInputNode(ctx context.Context) *flowgraph.InputNode {
|
||||||
const (
|
receiveBufSize := Params.dmMsgStreamReceiveBufSize()
|
||||||
receiveBufSize = 1024
|
pulsarBufSize := Params.dmPulsarBufSize()
|
||||||
pulsarBufSize = 1024
|
|
||||||
)
|
|
||||||
|
|
||||||
msgStreamURL, err := Params.PulsarAddress()
|
msgStreamURL, err := Params.PulsarAddress()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@ -2,9 +2,6 @@ package reader
|
|||||||
|
|
||||||
import "github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
|
import "github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
|
||||||
|
|
||||||
const maxQueueLength = flowgraph.MaxQueueLength
|
|
||||||
const maxParallelism = flowgraph.MaxQueueLength
|
|
||||||
|
|
||||||
type BaseNode = flowgraph.BaseNode
|
type BaseNode = flowgraph.BaseNode
|
||||||
type Node = flowgraph.Node
|
type Node = flowgraph.Node
|
||||||
type InputNode = flowgraph.InputNode
|
type InputNode = flowgraph.InputNode
|
||||||
|
|||||||
@ -1,5 +1,7 @@
|
|||||||
package reader
|
package reader
|
||||||
|
|
||||||
|
import "github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
|
||||||
|
|
||||||
type schemaUpdateNode struct {
|
type schemaUpdateNode struct {
|
||||||
BaseNode
|
BaseNode
|
||||||
schemaUpdateMsg schemaUpdateMsg
|
schemaUpdateMsg schemaUpdateMsg
|
||||||
@ -14,6 +16,9 @@ func (suNode *schemaUpdateNode) Operate(in []*Msg) []*Msg {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newSchemaUpdateNode() *schemaUpdateNode {
|
func newSchemaUpdateNode() *schemaUpdateNode {
|
||||||
|
maxQueueLength := flowgraph.Params.FlowGraphMaxQueueLength()
|
||||||
|
maxParallelism := flowgraph.Params.FlowGraphMaxParallelism()
|
||||||
|
|
||||||
baseNode := BaseNode{}
|
baseNode := BaseNode{}
|
||||||
baseNode.SetMaxQueueLength(maxQueueLength)
|
baseNode.SetMaxQueueLength(maxQueueLength)
|
||||||
baseNode.SetMaxParallelism(maxParallelism)
|
baseNode.SetMaxParallelism(maxParallelism)
|
||||||
|
|||||||
@ -2,6 +2,8 @@ package reader
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"log"
|
"log"
|
||||||
|
|
||||||
|
"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
|
||||||
)
|
)
|
||||||
|
|
||||||
type serviceTimeNode struct {
|
type serviceTimeNode struct {
|
||||||
@ -33,6 +35,9 @@ func (stNode *serviceTimeNode) Operate(in []*Msg) []*Msg {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newServiceTimeNode(replica *collectionReplica) *serviceTimeNode {
|
func newServiceTimeNode(replica *collectionReplica) *serviceTimeNode {
|
||||||
|
maxQueueLength := flowgraph.Params.FlowGraphMaxQueueLength()
|
||||||
|
maxParallelism := flowgraph.Params.FlowGraphMaxParallelism()
|
||||||
|
|
||||||
baseNode := BaseNode{}
|
baseNode := BaseNode{}
|
||||||
baseNode.SetMaxQueueLength(maxQueueLength)
|
baseNode.SetMaxQueueLength(maxQueueLength)
|
||||||
baseNode.SetMaxParallelism(maxParallelism)
|
baseNode.SetMaxParallelism(maxParallelism)
|
||||||
|
|||||||
@ -12,8 +12,12 @@ type ParamTable struct {
|
|||||||
|
|
||||||
var Params ParamTable
|
var Params ParamTable
|
||||||
|
|
||||||
func (p *ParamTable) InitParamTable() {
|
func (p *ParamTable) Init() {
|
||||||
p.Init()
|
p.BaseTable.Init()
|
||||||
|
err := p.LoadYaml("advanced/reader.yaml")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ParamTable) PulsarAddress() (string, error) {
|
func (p *ParamTable) PulsarAddress() (string, error) {
|
||||||
@ -25,7 +29,10 @@ func (p *ParamTable) PulsarAddress() (string, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *ParamTable) QueryNodeID() int {
|
func (p *ParamTable) QueryNodeID() int {
|
||||||
queryNodeID, _ := p.Load("reader.clientid")
|
queryNodeID, err := p.Load("reader.clientid")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
id, err := strconv.Atoi(queryNodeID)
|
id, err := strconv.Atoi(queryNodeID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
@ -34,7 +41,10 @@ func (p *ParamTable) QueryNodeID() int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *ParamTable) TopicStart() int {
|
func (p *ParamTable) TopicStart() int {
|
||||||
topicStart, _ := p.Load("reader.topicstart")
|
topicStart, err := p.Load("reader.topicstart")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
topicStartNum, err := strconv.Atoi(topicStart)
|
topicStartNum, err := strconv.Atoi(topicStart)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
@ -43,10 +53,98 @@ func (p *ParamTable) TopicStart() int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *ParamTable) TopicEnd() int {
|
func (p *ParamTable) TopicEnd() int {
|
||||||
topicEnd, _ := p.Load("reader.topicend")
|
topicEnd, err := p.Load("reader.topicend")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
topicEndNum, err := strconv.Atoi(topicEnd)
|
topicEndNum, err := strconv.Atoi(topicEnd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
return topicEndNum
|
return topicEndNum
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// private advanced params
|
||||||
|
func (p *ParamTable) statsServiceTimeInterval() int {
|
||||||
|
timeInterval, err := p.Load("service.statsServiceTimeInterval")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
interval, err := strconv.Atoi(timeInterval)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return interval
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *ParamTable) statsMsgStreamReceiveBufSize() int64 {
|
||||||
|
revBufSize, err := p.Load("msgStream.receiveBufSize.statsMsgStream")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
bufSize, err := strconv.Atoi(revBufSize)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return int64(bufSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *ParamTable) dmMsgStreamReceiveBufSize() int64 {
|
||||||
|
revBufSize, err := p.Load("msgStream.receiveBufSize.dmMsgStream")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
bufSize, err := strconv.Atoi(revBufSize)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return int64(bufSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *ParamTable) searchMsgStreamReceiveBufSize() int64 {
|
||||||
|
revBufSize, err := p.Load("msgStream.receiveBufSize.searchMsgStream")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
bufSize, err := strconv.Atoi(revBufSize)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return int64(bufSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *ParamTable) searchResultMsgStreamReceiveBufSize() int64 {
|
||||||
|
revBufSize, err := p.Load("msgStream.receiveBufSize.searchResultMsgStream")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
bufSize, err := strconv.Atoi(revBufSize)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return int64(bufSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *ParamTable) searchPulsarBufSize() int64 {
|
||||||
|
pulsarBufSize, err := p.Load("msgStream.pulsarBufSize.search")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
bufSize, err := strconv.Atoi(pulsarBufSize)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return int64(bufSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *ParamTable) dmPulsarBufSize() int64 {
|
||||||
|
pulsarBufSize, err := p.Load("msgStream.pulsarBufSize.dm")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
bufSize, err := strconv.Atoi(pulsarBufSize)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return int64(bufSize)
|
||||||
|
}
|
||||||
|
|||||||
@ -6,20 +6,73 @@ import (
|
|||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestParamTable_Init(t *testing.T) {
|
||||||
|
Params.Init()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParamTable_PulsarAddress(t *testing.T) {
|
||||||
|
Params.Init()
|
||||||
|
address, err := Params.PulsarAddress()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, address, "pulsar://localhost:6650")
|
||||||
|
}
|
||||||
|
|
||||||
func TestParamTable_QueryNodeID(t *testing.T) {
|
func TestParamTable_QueryNodeID(t *testing.T) {
|
||||||
Params.InitParamTable()
|
Params.Init()
|
||||||
id := Params.QueryNodeID()
|
id := Params.QueryNodeID()
|
||||||
assert.Equal(t, id, 0)
|
assert.Equal(t, id, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestParamTable_TopicStart(t *testing.T) {
|
func TestParamTable_TopicStart(t *testing.T) {
|
||||||
Params.InitParamTable()
|
Params.Init()
|
||||||
topicStart := Params.TopicStart()
|
topicStart := Params.TopicStart()
|
||||||
assert.Equal(t, topicStart, 0)
|
assert.Equal(t, topicStart, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestParamTable_TopicEnd(t *testing.T) {
|
func TestParamTable_TopicEnd(t *testing.T) {
|
||||||
Params.InitParamTable()
|
Params.Init()
|
||||||
topicEnd := Params.TopicEnd()
|
topicEnd := Params.TopicEnd()
|
||||||
assert.Equal(t, topicEnd, 128)
|
assert.Equal(t, topicEnd, 128)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestParamTable_statsServiceTimeInterval(t *testing.T) {
|
||||||
|
Params.Init()
|
||||||
|
interval := Params.statsServiceTimeInterval()
|
||||||
|
assert.Equal(t, interval, 1000)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParamTable_statsMsgStreamReceiveBufSize(t *testing.T) {
|
||||||
|
Params.Init()
|
||||||
|
bufSize := Params.statsMsgStreamReceiveBufSize()
|
||||||
|
assert.Equal(t, bufSize, int64(64))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParamTable_dmMsgStreamReceiveBufSize(t *testing.T) {
|
||||||
|
Params.Init()
|
||||||
|
bufSize := Params.dmMsgStreamReceiveBufSize()
|
||||||
|
assert.Equal(t, bufSize, int64(1024))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParamTable_searchMsgStreamReceiveBufSize(t *testing.T) {
|
||||||
|
Params.Init()
|
||||||
|
bufSize := Params.searchMsgStreamReceiveBufSize()
|
||||||
|
assert.Equal(t, bufSize, int64(512))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParamTable_searchResultMsgStreamReceiveBufSize(t *testing.T) {
|
||||||
|
Params.Init()
|
||||||
|
bufSize := Params.searchResultMsgStreamReceiveBufSize()
|
||||||
|
assert.Equal(t, bufSize, int64(64))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParamTable_searchPulsarBufSize(t *testing.T) {
|
||||||
|
Params.Init()
|
||||||
|
bufSize := Params.searchPulsarBufSize()
|
||||||
|
assert.Equal(t, bufSize, int64(512))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParamTable_dmPulsarBufSize(t *testing.T) {
|
||||||
|
Params.Init()
|
||||||
|
bufSize := Params.dmPulsarBufSize()
|
||||||
|
assert.Equal(t, bufSize, int64(1024))
|
||||||
|
}
|
||||||
|
|||||||
@ -35,11 +35,8 @@ type SearchResult struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newSearchService(ctx context.Context, replica *collectionReplica) *searchService {
|
func newSearchService(ctx context.Context, replica *collectionReplica) *searchService {
|
||||||
const (
|
receiveBufSize := Params.searchMsgStreamReceiveBufSize()
|
||||||
//TODO:: read config file
|
pulsarBufSize := Params.searchPulsarBufSize()
|
||||||
receiveBufSize = 1024
|
|
||||||
pulsarBufSize = 1024
|
|
||||||
)
|
|
||||||
|
|
||||||
msgStreamURL, err := Params.PulsarAddress()
|
msgStreamURL, err := Params.PulsarAddress()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@ -28,10 +28,8 @@ func newStatsService(ctx context.Context, replica *collectionReplica) *statsServ
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (sService *statsService) start() {
|
func (sService *statsService) start() {
|
||||||
const (
|
sleepTimeInterval := Params.statsServiceTimeInterval()
|
||||||
receiveBufSize = 1024
|
receiveBufSize := Params.statsMsgStreamReceiveBufSize()
|
||||||
sleepMillisecondTime = 1000
|
|
||||||
)
|
|
||||||
|
|
||||||
// start pulsar
|
// start pulsar
|
||||||
msgStreamURL, err := Params.PulsarAddress()
|
msgStreamURL, err := Params.PulsarAddress()
|
||||||
@ -50,12 +48,12 @@ func (sService *statsService) start() {
|
|||||||
(*sService.statsStream).Start()
|
(*sService.statsStream).Start()
|
||||||
|
|
||||||
// start service
|
// start service
|
||||||
fmt.Println("do segments statistic in ", strconv.Itoa(sleepMillisecondTime), "ms")
|
fmt.Println("do segments statistic in ", strconv.Itoa(sleepTimeInterval), "ms")
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-sService.ctx.Done():
|
case <-sService.ctx.Done():
|
||||||
return
|
return
|
||||||
case <-time.After(sleepMillisecondTime * time.Millisecond):
|
case <-time.After(time.Duration(sleepTimeInterval) * time.Millisecond):
|
||||||
sService.sendSegmentStatistic()
|
sService.sendSegmentStatistic()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -175,6 +175,9 @@ func receiveResult(ctx context.Context, fg *TimeTickedFlowGraph) (float64, bool)
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestTimeTickedFlowGraph_Start(t *testing.T) {
|
func TestTimeTickedFlowGraph_Start(t *testing.T) {
|
||||||
|
const MaxQueueLength = 1024
|
||||||
|
const MaxParallelism = 1024
|
||||||
|
|
||||||
duration := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
|
duration := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
|
||||||
ctx, cancel := context.WithDeadline(context.Background(), duration)
|
ctx, cancel := context.WithDeadline(context.Background(), duration)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|||||||
@ -46,9 +46,12 @@ func (inNode *InputNode) Operate(in []*Msg) []*Msg {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewInputNode(inStream *msgstream.MsgStream, nodeName string) *InputNode {
|
func NewInputNode(inStream *msgstream.MsgStream, nodeName string) *InputNode {
|
||||||
|
maxQueueLength := Params.FlowGraphMaxQueueLength()
|
||||||
|
maxParallelism := Params.FlowGraphMaxParallelism()
|
||||||
|
|
||||||
baseNode := BaseNode{}
|
baseNode := BaseNode{}
|
||||||
baseNode.SetMaxQueueLength(MaxQueueLength)
|
baseNode.SetMaxQueueLength(maxQueueLength)
|
||||||
baseNode.SetMaxParallelism(MaxParallelism)
|
baseNode.SetMaxParallelism(maxParallelism)
|
||||||
|
|
||||||
return &InputNode{
|
return &InputNode{
|
||||||
BaseNode: baseNode,
|
BaseNode: baseNode,
|
||||||
|
|||||||
45
internal/util/flowgraph/param_table.go
Normal file
45
internal/util/flowgraph/param_table.go
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
package flowgraph
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ParamTable struct {
|
||||||
|
paramtable.BaseTable
|
||||||
|
}
|
||||||
|
|
||||||
|
var Params ParamTable
|
||||||
|
|
||||||
|
func (p *ParamTable) Init() {
|
||||||
|
p.BaseTable.Init()
|
||||||
|
err := p.LoadYaml("advanced/flow_graph.yaml")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *ParamTable) FlowGraphMaxQueueLength() int32 {
|
||||||
|
queueLength, err := p.Load("flowGraph.maxQueueLength")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
length, err := strconv.Atoi(queueLength)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return int32(length)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *ParamTable) FlowGraphMaxParallelism() int32 {
|
||||||
|
maxParallelism, err := p.Load("flowGraph.maxParallelism")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
maxPara, err := strconv.Atoi(maxParallelism)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return int32(maxPara)
|
||||||
|
}
|
||||||
19
internal/util/flowgraph/param_table_test.go
Normal file
19
internal/util/flowgraph/param_table_test.go
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
package flowgraph
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestParamTable_flowGraphMaxQueueLength(t *testing.T) {
|
||||||
|
Params.Init()
|
||||||
|
length := Params.FlowGraphMaxQueueLength()
|
||||||
|
assert.Equal(t, length, int32(1024))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParamTable_flowGraphMaxParallelism(t *testing.T) {
|
||||||
|
Params.Init()
|
||||||
|
maxParallelism := Params.FlowGraphMaxParallelism()
|
||||||
|
assert.Equal(t, maxParallelism, int32(1024))
|
||||||
|
}
|
||||||
@ -4,6 +4,3 @@ import "github.com/zilliztech/milvus-distributed/internal/util/typeutil"
|
|||||||
|
|
||||||
type Timestamp = typeutil.Timestamp
|
type Timestamp = typeutil.Timestamp
|
||||||
type NodeName = string
|
type NodeName = string
|
||||||
|
|
||||||
const MaxQueueLength = 1024
|
|
||||||
const MaxParallelism = 1024
|
|
||||||
|
|||||||
@ -48,10 +48,16 @@ if __name__ == "__main__":
|
|||||||
|
|
||||||
node_names = ["Expr", "PlanNode"]
|
node_names = ["Expr", "PlanNode"]
|
||||||
visitor_info = {
|
visitor_info = {
|
||||||
'Expr': [{
|
'Expr': [
|
||||||
|
{
|
||||||
'visitor_name': "ShowExprVisitor",
|
'visitor_name': "ShowExprVisitor",
|
||||||
"parameter_name": 'expr',
|
"parameter_name": 'expr',
|
||||||
}],
|
},
|
||||||
|
{
|
||||||
|
'visitor_name': "ExecExprVisitor",
|
||||||
|
"parameter_name": 'expr',
|
||||||
|
},
|
||||||
|
],
|
||||||
'PlanNode': [
|
'PlanNode': [
|
||||||
{
|
{
|
||||||
'visitor_name': "ShowPlanNodeVisitor",
|
'visitor_name': "ShowPlanNodeVisitor",
|
||||||
|
|||||||
@ -2,7 +2,7 @@
|
|||||||
@@root_base@@Visitor
|
@@root_base@@Visitor
|
||||||
####
|
####
|
||||||
@@@@body@struct_name
|
@@@@body@struct_name
|
||||||
virtual void
|
void
|
||||||
visit(@@struct_name@@& @@parameter_name@@) override;
|
visit(@@struct_name@@& @@parameter_name@@) override;
|
||||||
####
|
####
|
||||||
@@@@main
|
@@@@main
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user