diff --git a/internal/core/src/query/CMakeLists.txt b/internal/core/src/query/CMakeLists.txt index 06b15ababe..f1e403037b 100644 --- a/internal/core/src/query/CMakeLists.txt +++ b/internal/core/src/query/CMakeLists.txt @@ -8,6 +8,8 @@ set(MILVUS_QUERY_SRCS visitors/ExecExprVisitor.cpp visitors/VerifyPlanNodeVisitor.cpp visitors/VerifyExprVisitor.cpp + visitors/ExtractInfoPlanNodeVisitor.cpp + visitors/ExtractInfoExprVisitor.cpp Plan.cpp SearchOnGrowing.cpp SearchOnSealed.cpp diff --git a/internal/core/src/query/Plan.cpp b/internal/core/src/query/Plan.cpp index 84bb68fcc6..056910f49b 100644 --- a/internal/core/src/query/Plan.cpp +++ b/internal/core/src/query/Plan.cpp @@ -22,6 +22,7 @@ #include #include #include "query/generated/VerifyPlanNodeVisitor.h" +#include "query/generated/ExtractInfoPlanNodeVisitor.h" namespace milvus::query { @@ -142,9 +143,14 @@ Parser::CreatePlanImpl(const std::string& dsl_str) { VerifyPlanNodeVisitor verifier; vec_node->accept(verifier); + ExtractedPlanInfo plan_info(schema.size()); + ExtractInfoPlanNodeVisitor extractor(plan_info); + vec_node->accept(extractor); + auto plan = std::make_unique(schema); plan->tag2field_ = std::move(tag2field_); plan->plan_node_ = std::move(vec_node); + plan->extra_info_opt_ = std::move(plan_info); return plan; } diff --git a/internal/core/src/query/PlanImpl.h b/internal/core/src/query/PlanImpl.h index 8fe71c8cc5..a29c00bc48 100644 --- a/internal/core/src/query/PlanImpl.h +++ b/internal/core/src/query/PlanImpl.h @@ -20,11 +20,28 @@ #include #include #include +#include +#include namespace milvus::query { using Json = nlohmann::json; // class definitions + +struct ExtractedPlanInfo { + public: + explicit ExtractedPlanInfo(int64_t size) : involved_fields_(size) { + } + + void + add_involved_field(FieldOffset field_offset) { + involved_fields_.set(field_offset.get()); + } + + public: + boost::dynamic_bitset<> involved_fields_; +}; + struct Plan { public: explicit Plan(const Schema& schema) : schema_(schema) { @@ -35,7 +52,9 @@ struct Plan { std::unique_ptr plan_node_; std::map tag2field_; // PlaceholderName -> FieldOffset std::vector target_entries_; - std::vector referred_fields_; + + public: + std::optional extra_info_opt_; // TODO: add move extra info }; diff --git a/internal/core/src/query/generated/.gitignore b/internal/core/src/query/generated/.gitignore new file mode 100644 index 0000000000..cad3ab5945 --- /dev/null +++ b/internal/core/src/query/generated/.gitignore @@ -0,0 +1,3 @@ +!.gitignore +*PlanNodeVisitor.cpp +*ExprVisitor.cpp \ No newline at end of file diff --git a/internal/core/src/query/generated/ExecPlanNodeVisitor.cpp b/internal/core/src/query/generated/ExecPlanNodeVisitor.cpp deleted file mode 100644 index 0f237246b6..0000000000 --- a/internal/core/src/query/generated/ExecPlanNodeVisitor.cpp +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License - -#error TODO: copy this file out, and modify the content. -#include "query/generated/ExecPlanNodeVisitor.h" - -namespace milvus::query { -void -ExecPlanNodeVisitor::visit(FloatVectorANNS& node) { - // TODO -} - -void -ExecPlanNodeVisitor::visit(BinaryVectorANNS& node) { - // TODO -} - -} // namespace milvus::query diff --git a/internal/core/src/query/generated/VerifyExprVisitor.cpp b/internal/core/src/query/generated/ExtractInfoExprVisitor.h similarity index 56% rename from internal/core/src/query/generated/VerifyExprVisitor.cpp rename to internal/core/src/query/generated/ExtractInfoExprVisitor.h index 44af4dde81..20b429cd8a 100644 --- a/internal/core/src/query/generated/VerifyExprVisitor.cpp +++ b/internal/core/src/query/generated/ExtractInfoExprVisitor.h @@ -9,28 +9,32 @@ // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License -#error TODO: copy this file out, and modify the content. -#include "query/generated/VerifyExprVisitor.h" +#pragma once +// Generated File +// DO NOT EDIT +#include "query/Plan.h" +#include "ExprVisitor.h" namespace milvus::query { -void -VerifyExprVisitor::visit(BoolUnaryExpr& expr) { - // TODO -} +class ExtractInfoExprVisitor : public ExprVisitor { + public: + void + visit(BoolUnaryExpr& expr) override; -void -VerifyExprVisitor::visit(BoolBinaryExpr& expr) { - // TODO -} + void + visit(BoolBinaryExpr& expr) override; -void -VerifyExprVisitor::visit(TermExpr& expr) { - // TODO -} + void + visit(TermExpr& expr) override; -void -VerifyExprVisitor::visit(RangeExpr& expr) { - // TODO -} + void + visit(RangeExpr& expr) override; + public: + explicit ExtractInfoExprVisitor(ExtractedPlanInfo& plan_info) : plan_info_(plan_info) { + } + + private: + ExtractedPlanInfo& plan_info_; +}; } // namespace milvus::query diff --git a/internal/core/src/query/generated/ExecExprVisitor.cpp b/internal/core/src/query/generated/ExtractInfoPlanNodeVisitor.h similarity index 60% rename from internal/core/src/query/generated/ExecExprVisitor.cpp rename to internal/core/src/query/generated/ExtractInfoPlanNodeVisitor.h index b8ffed0f82..97b19b7289 100644 --- a/internal/core/src/query/generated/ExecExprVisitor.cpp +++ b/internal/core/src/query/generated/ExtractInfoPlanNodeVisitor.h @@ -9,28 +9,26 @@ // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License -#error TODO: copy this file out, and modify the content. -#include "query/generated/ExecExprVisitor.h" +#pragma once +// Generated File +// DO NOT EDIT +#include "query/Plan.h" +#include "PlanNodeVisitor.h" namespace milvus::query { -void -ExecExprVisitor::visit(BoolUnaryExpr& expr) { - // TODO -} +class ExtractInfoPlanNodeVisitor : public PlanNodeVisitor { + public: + void + visit(FloatVectorANNS& node) override; -void -ExecExprVisitor::visit(BoolBinaryExpr& expr) { - // TODO -} + void + visit(BinaryVectorANNS& node) override; -void -ExecExprVisitor::visit(TermExpr& expr) { - // TODO -} - -void -ExecExprVisitor::visit(RangeExpr& expr) { - // TODO -} + public: + explicit ExtractInfoPlanNodeVisitor(ExtractedPlanInfo& plan_info) : plan_info_(plan_info) { + } + private: + ExtractedPlanInfo& plan_info_; +}; } // namespace milvus::query diff --git a/internal/core/src/query/generated/ShowExprVisitor.cpp b/internal/core/src/query/generated/ShowExprVisitor.cpp deleted file mode 100644 index cfb26067a4..0000000000 --- a/internal/core/src/query/generated/ShowExprVisitor.cpp +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License - -#error TODO: copy this file out, and modify the content. -#include "query/generated/ShowExprVisitor.h" - -namespace milvus::query { -void -ShowExprVisitor::visit(BoolUnaryExpr& expr) { - // TODO -} - -void -ShowExprVisitor::visit(BoolBinaryExpr& expr) { - // TODO -} - -void -ShowExprVisitor::visit(TermExpr& expr) { - // TODO -} - -void -ShowExprVisitor::visit(RangeExpr& expr) { - // TODO -} - -} // namespace milvus::query diff --git a/internal/core/src/query/generated/ShowPlanNodeVisitor.cpp b/internal/core/src/query/generated/ShowPlanNodeVisitor.cpp deleted file mode 100644 index 2676e26cd7..0000000000 --- a/internal/core/src/query/generated/ShowPlanNodeVisitor.cpp +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License - -#error TODO: copy this file out, and modify the content. -#include "query/generated/ShowPlanNodeVisitor.h" - -namespace milvus::query { -void -ShowPlanNodeVisitor::visit(FloatVectorANNS& node) { - // TODO -} - -void -ShowPlanNodeVisitor::visit(BinaryVectorANNS& node) { - // TODO -} - -} // namespace milvus::query diff --git a/internal/core/src/query/generated/VerifyPlanNodeVisitor.cpp b/internal/core/src/query/generated/VerifyPlanNodeVisitor.cpp deleted file mode 100644 index c7b0656f57..0000000000 --- a/internal/core/src/query/generated/VerifyPlanNodeVisitor.cpp +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License - -#error TODO: copy this file out, and modify the content. -#include "query/generated/VerifyPlanNodeVisitor.h" - -namespace milvus::query { -void -VerifyPlanNodeVisitor::visit(FloatVectorANNS& node) { - // TODO -} - -void -VerifyPlanNodeVisitor::visit(BinaryVectorANNS& node) { - // TODO -} - -} // namespace milvus::query diff --git a/internal/core/src/query/visitors/ExtractInfoExprVisitor.cpp b/internal/core/src/query/visitors/ExtractInfoExprVisitor.cpp new file mode 100644 index 0000000000..97e20c9185 --- /dev/null +++ b/internal/core/src/query/visitors/ExtractInfoExprVisitor.cpp @@ -0,0 +1,52 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include "query/Plan.h" +#include "query/generated/ExtractInfoExprVisitor.h" + +namespace milvus::query { +#if 1 +namespace impl { +// THIS CONTAINS EXTRA BODY FOR VISITOR +// WILL BE USED BY GENERATOR UNDER suvlim/core_gen/ +class ExtractInfoExprVisitor : ExprVisitor { + public: + explicit ExtractInfoExprVisitor(ExtractedPlanInfo& plan_info) : plan_info_(plan_info) { + } + + private: + ExtractedPlanInfo& plan_info_; +}; +} // namespace impl +#endif + +void +ExtractInfoExprVisitor::visit(BoolUnaryExpr& expr) { + expr.child_->accept(*this); +} + +void +ExtractInfoExprVisitor::visit(BoolBinaryExpr& expr) { + expr.left_->accept(*this); + expr.right_->accept(*this); +} + +void +ExtractInfoExprVisitor::visit(TermExpr& expr) { + plan_info_.add_involved_field(expr.field_offset_); +} + +void +ExtractInfoExprVisitor::visit(RangeExpr& expr) { + plan_info_.add_involved_field(expr.field_offset_); +} + +} // namespace milvus::query diff --git a/internal/core/src/query/visitors/ExtractInfoPlanNodeVisitor.cpp b/internal/core/src/query/visitors/ExtractInfoPlanNodeVisitor.cpp new file mode 100644 index 0000000000..717785b18a --- /dev/null +++ b/internal/core/src/query/visitors/ExtractInfoPlanNodeVisitor.cpp @@ -0,0 +1,51 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include "query/Plan.h" +#include "query/generated/ExtractInfoPlanNodeVisitor.h" +#include "query/generated/ExtractInfoExprVisitor.h" + +namespace milvus::query { + +#if 1 +namespace impl { +// THIS CONTAINS EXTRA BODY FOR VISITOR +// WILL BE USED BY GENERATOR UNDER suvlim/core_gen/ +class ExtractInfoPlanNodeVisitor : PlanNodeVisitor { + public: + explicit ExtractInfoPlanNodeVisitor(ExtractedPlanInfo& plan_info) : plan_info_(plan_info) { + } + + private: + ExtractedPlanInfo& plan_info_; +}; +} // namespace impl +#endif + +void +ExtractInfoPlanNodeVisitor::visit(FloatVectorANNS& node) { + plan_info_.add_involved_field(node.query_info_.field_offset_); + if (node.predicate_.has_value()) { + ExtractInfoExprVisitor expr_visitor(plan_info_); + node.predicate_.value()->accept(expr_visitor); + } +} + +void +ExtractInfoPlanNodeVisitor::visit(BinaryVectorANNS& node) { + plan_info_.add_involved_field(node.query_info_.field_offset_); + if (node.predicate_.has_value()) { + ExtractInfoExprVisitor expr_visitor(plan_info_); + node.predicate_.value()->accept(expr_visitor); + } +} + +} // namespace milvus::query diff --git a/internal/core/src/segcore/SealedIndexingRecord.h b/internal/core/src/segcore/SealedIndexingRecord.h index 07607bbd2d..62a8362071 100644 --- a/internal/core/src/segcore/SealedIndexingRecord.h +++ b/internal/core/src/segcore/SealedIndexingRecord.h @@ -46,6 +46,12 @@ struct SealedIndexingRecord { return field_indexings_.at(field_offset).get(); } + void + drop_field_indexing(FieldOffset field_offset) { + std::unique_lock lck(mutex_); + field_indexings_.erase(field_offset); + } + bool is_ready(FieldOffset field_offset) const { std::shared_lock lck(mutex_); diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index 9776936c7c..54c3bb0093 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -158,9 +158,6 @@ class SegmentGrowingImpl : public SegmentGrowing { bulk_subscript_impl(*vec_ptr, seg_offsets, count, output); } - int64_t - num_chunk() const override; - Status LoadIndexing(const LoadIndexInfo& info) override; @@ -188,9 +185,17 @@ class SegmentGrowingImpl : public SegmentGrowing { get_deleted_bitmap(int64_t del_barrier, Timestamp query_timestamp, int64_t insert_barrier, bool force = false); protected: + int64_t + num_chunk() const override; + SpanBase chunk_data_impl(FieldOffset field_offset, int64_t chunk_id) const override; + void + check_search(const query::Plan* plan) const override { + Assert(plan); + } + private: int64_t size_per_chunk_; SchemaPtr schema_; diff --git a/internal/core/src/segcore/SegmentInterface.cpp b/internal/core/src/segcore/SegmentInterface.cpp index 0fa5fcbb60..313d6f4f1e 100644 --- a/internal/core/src/segcore/SegmentInterface.cpp +++ b/internal/core/src/segcore/SegmentInterface.cpp @@ -48,6 +48,7 @@ SegmentInternalInterface::Search(const query::Plan* plan, const Timestamp* timestamps, int64_t num_groups) const { std::shared_lock lck(mutex_); + check_search(plan); Assert(num_groups == 1); query::ExecPlanNodeVisitor visitor(*this, timestamps[0], *placeholder_groups[0]); auto results = visitor.get_moved_result(*plan->plan_node_); diff --git a/internal/core/src/segcore/SegmentInterface.h b/internal/core/src/segcore/SegmentInterface.h index ad31bfd70d..a9460d5895 100644 --- a/internal/core/src/segcore/SegmentInterface.h +++ b/internal/core/src/segcore/SegmentInterface.h @@ -117,6 +117,9 @@ class SegmentInternalInterface : public SegmentInterface { virtual void bulk_subscript(FieldOffset field_offset, const int64_t* seg_offsets, int64_t count, void* output) const = 0; + virtual void + check_search(const query::Plan* plan) const = 0; + protected: mutable std::shared_mutex mutex_; }; diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 7558259e6b..273aeab3e5 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -31,7 +31,7 @@ SegmentSealedImpl::LoadIndex(const LoadIndexInfo& info) { } Assert(!vec_indexings_.is_ready(field_offset)); vec_indexings_.append_field_indexing(field_offset, GetMetricType(metric_type_str), info.index); - ++ready_count_; + set_field_ready(field_offset, true); } void @@ -70,10 +70,10 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) { // write data under lock std::unique_lock lck(mutex_); update_row_count(info.row_count); - AssertInfo(columns_data_[field_offset.get()].empty(), "already exists"); - columns_data_[field_offset.get()] = std::move(vec_data); + AssertInfo(field_datas_[field_offset.get()].empty(), "already exists"); + field_datas_[field_offset.get()] = std::move(vec_data); - ++ready_count_; + set_field_ready(field_offset, true); } } @@ -96,10 +96,10 @@ SegmentSealedImpl::size_per_chunk() const { SpanBase SegmentSealedImpl::chunk_data_impl(FieldOffset field_offset, int64_t chunk_id) const { std::shared_lock lck(mutex_); + Assert(is_field_ready(field_offset)); auto& field_meta = schema_->operator[](field_offset); auto element_sizeof = field_meta.get_sizeof(); - Assert(is_all_ready()); - SpanBase base(columns_data_[field_offset.get()].data(), row_count_opt_.value(), element_sizeof); + SpanBase base(field_datas_[field_offset.get()].data(), row_count_opt_.value(), element_sizeof); return base; } @@ -143,13 +143,39 @@ SegmentSealedImpl::vector_search(int64_t vec_count, } void SegmentSealedImpl::DropFieldData(const FieldId field_id) { - std::unique_lock lck(mutex_); - PanicInfo("unimplemented"); + if (SystemProperty::Instance().IsSystem(field_id)) { + auto system_field_type = SystemProperty::Instance().GetSystemFieldType(field_id); + Assert(system_field_type == SystemFieldType::RowId); + + std::unique_lock lck(mutex_); + --system_ready_count_; + auto row_ids = std::move(row_ids_); + lck.unlock(); + + row_ids.clear(); + } else { + auto field_offset = schema_->get_offset(field_id); + auto& field_meta = schema_->operator[](field_offset); + Assert(!field_meta.is_vector()); + + std::unique_lock lck(mutex_); + set_field_ready(field_offset, false); + auto vec = std::move(field_datas_[field_offset.get()]); + lck.unlock(); + + vec.clear(); + } } + void SegmentSealedImpl::DropIndex(const FieldId field_id) { + Assert(!SystemProperty::Instance().IsSystem(field_id)); + auto field_offset = schema_->get_offset(field_id); + auto& field_meta = schema_->operator[](field_offset); + Assert(field_meta.is_vector()); + std::unique_lock lck(mutex_); - PanicInfo("unimplemented"); + vec_indexings_.drop_field_indexing(field_offset); } SegmentSealedPtr diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index faf1a9f498..89f2ba4674 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -18,7 +18,8 @@ namespace milvus::segcore { class SegmentSealedImpl : public SegmentSealed { public: - explicit SegmentSealedImpl(SchemaPtr schema) : schema_(schema), columns_data_(schema->size()) { + explicit SegmentSealedImpl(SchemaPtr schema) + : schema_(schema), field_datas_(schema->size()), field_ready_bitset_(schema->size()) { } void LoadIndex(const LoadIndexInfo& info) override; @@ -65,7 +66,7 @@ class SegmentSealedImpl : public SegmentSealed { const int64_t* seg_offsets, int64_t count, void* output) const override { - Assert(is_all_ready()); + Assert(is_system_field_ready()); Assert(system_type == SystemFieldType::RowId); bulk_subscript_impl(row_ids_.data(), seg_offsets, count, output); } @@ -74,10 +75,29 @@ class SegmentSealedImpl : public SegmentSealed { // where Vec is determined from field_offset void bulk_subscript(FieldOffset field_offset, const int64_t* seg_offsets, int64_t count, void* output) const override { - Assert(is_all_ready()); + Assert(is_field_ready(field_offset)); auto& field_meta = schema_->operator[](field_offset); Assert(field_meta.get_data_type() == DataType::INT64); - bulk_subscript_impl(columns_data_[field_offset.get()].data(), seg_offsets, count, output); + bulk_subscript_impl(field_datas_[field_offset.get()].data(), seg_offsets, count, output); + } + + void + check_search(const query::Plan* plan) const override { + Assert(plan); + Assert(plan->extra_info_opt_.has_value()); + + if (!is_system_field_ready()) { + PanicInfo("System Field RowID is not loaded"); + } + + auto& request_fields = plan->extra_info_opt_.value().involved_fields_; + Assert(request_fields.size() == field_ready_bitset_.size()); + auto absent_fields = request_fields - field_ready_bitset_; + if (absent_fields.any()) { + auto field_offset = FieldOffset(absent_fields.find_first()); + auto& field_meta = schema_->operator[](field_offset); + PanicInfo("User Field(" + field_meta.get_name().get() + ") is not loaded"); + } } private: @@ -116,25 +136,25 @@ class SegmentSealedImpl : public SegmentSealed { } bool - is_all_ready() const { - // TODO: optimize here - // NOTE: including row_ids - if (!is_system_field_ready()) { - return false; - } - return ready_count_ == schema_->size(); + is_field_ready(FieldOffset field_offset) const { + return field_ready_bitset_.test(field_offset.get()); + } + + void + set_field_ready(FieldOffset field_offset, bool flag = true) { + field_ready_bitset_[field_offset.get()] = flag; } private: // segment loading state - std::atomic ready_count_ = 0; + boost::dynamic_bitset<> field_ready_bitset_; std::atomic system_ready_count_ = 0; // segment datas // TODO: generate index for scalar std::optional row_count_opt_; std::map scalar_indexings_; SealedIndexingRecord vec_indexings_; - std::vector> columns_data_; + std::vector> field_datas_; aligned_vector row_ids_; SchemaPtr schema_; }; diff --git a/internal/core/unittest/test_sealed.cpp b/internal/core/unittest/test_sealed.cpp index 713fdb6b06..0c4ffe5182 100644 --- a/internal/core/unittest/test_sealed.cpp +++ b/internal/core/unittest/test_sealed.cpp @@ -249,8 +249,9 @@ TEST(Sealed, LoadFieldData) { auto metric_type = MetricType::METRIC_L2; auto schema = std::make_shared(); auto fakevec_id = schema->AddDebugField("fakevec", DataType::VECTOR_FLOAT, dim, metric_type); - schema->AddDebugField("counter", DataType::INT64); - schema->AddDebugField("double", DataType::DOUBLE); + auto counter_id = schema->AddDebugField("counter", DataType::INT64); + auto double_id = schema->AddDebugField("double", DataType::DOUBLE); + auto nothing_id = schema->AddDebugField("nothing", DataType::INT32); auto dataset = DataGen(schema, N); @@ -268,24 +269,6 @@ TEST(Sealed, LoadFieldData) { indexing->AddWithoutIds(database, conf); auto segment = CreateSealedSegment(schema); - SealedLoader(dataset, *segment); - { - LoadIndexInfo vec_info; - vec_info.field_id = fakevec_id.get(); - vec_info.field_name = "fakevec"; - vec_info.index = indexing; - vec_info.index_params["metric_type"] = milvus::knowhere::Metric::L2; - segment->LoadIndex(vec_info); - } - ASSERT_EQ(segment->num_chunk(), 1); - auto chunk_span1 = segment->chunk_data(FieldOffset(1), 0); - auto chunk_span2 = segment->chunk_data(FieldOffset(2), 0); - auto ref1 = dataset.get_col(1); - auto ref2 = dataset.get_col(2); - for (int i = 0; i < N; ++i) { - ASSERT_EQ(chunk_span1[i], ref1[i]); - ASSERT_EQ(chunk_span2[i], ref2[i]); - } std::string dsl = R"({ "bool": { "must": [ @@ -313,14 +296,47 @@ TEST(Sealed, LoadFieldData) { } })"; + Timestamp time = 1000000; auto plan = CreatePlan(*schema, dsl); auto num_queries = 5; auto ph_group_raw = CreatePlaceholderGroup(num_queries, 16, 1024); auto ph_group = ParsePlaceholderGroup(plan.get(), ph_group_raw.SerializeAsString()); - Timestamp time = 1000000; std::vector ph_group_arr = {ph_group.get()}; + ASSERT_ANY_THROW(segment->Search(plan.get(), ph_group_arr.data(), &time, 1)); + + SealedLoader(dataset, *segment); + segment->DropFieldData(nothing_id); + + ASSERT_ANY_THROW(segment->Search(plan.get(), ph_group_arr.data(), &time, 1)); + + LoadIndexInfo vec_info; + vec_info.field_id = fakevec_id.get(); + vec_info.field_name = "fakevec"; + vec_info.index = indexing; + vec_info.index_params["metric_type"] = milvus::knowhere::Metric::L2; + segment->LoadIndex(vec_info); + + ASSERT_EQ(segment->num_chunk(), 1); + auto chunk_span1 = segment->chunk_data(FieldOffset(1), 0); + auto chunk_span2 = segment->chunk_data(FieldOffset(2), 0); + auto ref1 = dataset.get_col(1); + auto ref2 = dataset.get_col(2); + for (int i = 0; i < N; ++i) { + ASSERT_EQ(chunk_span1[i], ref1[i]); + ASSERT_EQ(chunk_span2[i], ref2[i]); + } + auto qr = segment->Search(plan.get(), ph_group_arr.data(), &time, 1); auto json = QueryResultToJson(qr); std::cout << json.dump(1); + + segment->DropIndex(fakevec_id); + ASSERT_ANY_THROW(segment->Search(plan.get(), ph_group_arr.data(), &time, 1)); + segment->LoadIndex(vec_info); + auto qr2 = segment->Search(plan.get(), ph_group_arr.data(), &time, 1); + auto json2 = QueryResultToJson(qr); + ASSERT_EQ(json.dump(-2), json2.dump(-2)); + segment->DropFieldData(double_id); + ASSERT_ANY_THROW(segment->Search(plan.get(), ph_group_arr.data(), &time, 1)); } diff --git a/internal/msgstream/rmqms/rmq_msgstream.go b/internal/msgstream/rmq/rmq_msgstream.go similarity index 99% rename from internal/msgstream/rmqms/rmq_msgstream.go rename to internal/msgstream/rmq/rmq_msgstream.go index 5c20b24c27..8656bb2ec9 100644 --- a/internal/msgstream/rmqms/rmq_msgstream.go +++ b/internal/msgstream/rmq/rmq_msgstream.go @@ -1,4 +1,4 @@ -package rmqms +package rmqmsgstream import ( "context" diff --git a/internal/querynode/collection_replica.go b/internal/querynode/collection_replica.go index 3c45dc7a86..1e6ab95506 100644 --- a/internal/querynode/collection_replica.go +++ b/internal/querynode/collection_replica.go @@ -54,9 +54,6 @@ type collectionReplica interface { getPartitionByTag(collectionID UniqueID, partitionTag string) (*Partition, error) getPartitionByID(collectionID UniqueID, partitionID UniqueID) (*Partition, error) hasPartition(collectionID UniqueID, partitionTag string) bool - enablePartitionDM(collectionID UniqueID, partitionID UniqueID) error - disablePartitionDM(collectionID UniqueID, partitionID UniqueID) error - getEnablePartitionDM(collectionID UniqueID, partitionID UniqueID) (bool, error) // segment getSegmentNum() int @@ -365,43 +362,6 @@ func (colReplica *collectionReplicaImpl) hasPartition(collectionID UniqueID, par return false } -func (colReplica *collectionReplicaImpl) enablePartitionDM(collectionID UniqueID, partitionID UniqueID) error { - colReplica.mu.Lock() - defer colReplica.mu.Unlock() - - partition, err := colReplica.getPartitionByIDPrivate(collectionID, partitionID) - if err != nil { - return err - } - - partition.enableDM = true - return nil -} - -func (colReplica *collectionReplicaImpl) disablePartitionDM(collectionID UniqueID, partitionID UniqueID) error { - colReplica.mu.Lock() - defer colReplica.mu.Unlock() - - partition, err := colReplica.getPartitionByIDPrivate(collectionID, partitionID) - if err != nil { - return err - } - - partition.enableDM = false - return nil -} - -func (colReplica *collectionReplicaImpl) getEnablePartitionDM(collectionID UniqueID, partitionID UniqueID) (bool, error) { - colReplica.mu.Lock() - defer colReplica.mu.Unlock() - - partition, err := colReplica.getPartitionByIDPrivate(collectionID, partitionID) - if err != nil { - return false, err - } - return partition.enableDM, nil -} - //----------------------------------------------------------------------------------------------------- segment func (colReplica *collectionReplicaImpl) getSegmentNum() int { colReplica.mu.RLock() diff --git a/internal/querynode/data_sync_service.go b/internal/querynode/data_sync_service.go index 04856af94d..608006ec61 100644 --- a/internal/querynode/data_sync_service.go +++ b/internal/querynode/data_sync_service.go @@ -19,18 +19,17 @@ type dataSyncService struct { } func newDataSyncService(ctx context.Context, replica collectionReplica) *dataSyncService { - service := &dataSyncService{ + + return &dataSyncService{ ctx: ctx, fg: nil, replica: replica, } - - service.initNodes() - return service } func (dsService *dataSyncService) start() { + dsService.initNodes() dsService.fg.Start() } @@ -48,7 +47,7 @@ func (dsService *dataSyncService) initNodes() { var dmStreamNode node = dsService.newDmInputNode(dsService.ctx) var ddStreamNode node = dsService.newDDInputNode(dsService.ctx) - var filterDmNode node = newFilteredDmNode(dsService.replica) + var filterDmNode node = newFilteredDmNode() var ddNode node = newDDNode(dsService.replica) var insertNode node = newInsertNode(dsService.replica) diff --git a/internal/querynode/flow_graph_filter_dm_node.go b/internal/querynode/flow_graph_filter_dm_node.go index 5a75be2c56..acef0f2b04 100644 --- a/internal/querynode/flow_graph_filter_dm_node.go +++ b/internal/querynode/flow_graph_filter_dm_node.go @@ -12,8 +12,7 @@ import ( type filterDmNode struct { baseNode - ddMsg *ddMsg - replica collectionReplica + ddMsg *ddMsg } func (fdmNode *filterDmNode) Name() string { @@ -103,12 +102,6 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg { } func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg) *msgstream.InsertMsg { - // TODO: open this check - // check if partition dm enable - //if enable, _ := fdmNode.replica.getEnablePartitionDM(msg.CollectionID, msg.PartitionID); !enable { - // return nil - //} - // No dd record, do all insert requests. records, ok := fdmNode.ddMsg.collectionRecords[msg.CollectionName] if !ok { @@ -161,7 +154,7 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg return msg } -func newFilteredDmNode(replica collectionReplica) *filterDmNode { +func newFilteredDmNode() *filterDmNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism @@ -171,6 +164,5 @@ func newFilteredDmNode(replica collectionReplica) *filterDmNode { return &filterDmNode{ baseNode: baseNode, - replica: replica, } } diff --git a/internal/querynode/param_table.go b/internal/querynode/param_table.go index 7a41c50e3e..e5e76d24a8 100644 --- a/internal/querynode/param_table.go +++ b/internal/querynode/param_table.go @@ -12,11 +12,9 @@ import ( type ParamTable struct { paramtable.BaseTable - PulsarAddress string - ETCDAddress string - MetaRootPath string - WriteNodeSegKvSubPath string - IndexBuilderAddress string + PulsarAddress string + ETCDAddress string + MetaRootPath string QueryNodeIP string QueryNodePort int64 @@ -133,8 +131,6 @@ func (p *ParamTable) Init() { p.initPulsarAddress() p.initETCDAddress() p.initMetaRootPath() - p.initWriteNodeSegKvSubPath() - p.initIndexBuilderAddress() p.initGracefulTime() p.initMsgChannelSubName() @@ -250,14 +246,6 @@ func (p *ParamTable) initPulsarAddress() { p.PulsarAddress = url } -func (p *ParamTable) initIndexBuilderAddress() { - ret, err := p.Load("_IndexBuilderAddress") - if err != nil { - panic(err) - } - p.IndexBuilderAddress = ret -} - func (p *ParamTable) initInsertChannelRange() { insertChannelRange, err := p.Load("msgChannel.channelRange.insert") if err != nil { @@ -350,14 +338,6 @@ func (p *ParamTable) initMetaRootPath() { p.MetaRootPath = rootPath + "/" + subPath } -func (p *ParamTable) initWriteNodeSegKvSubPath() { - subPath, err := p.Load("etcd.writeNodeSegKvSubPath") - if err != nil { - panic(err) - } - p.WriteNodeSegKvSubPath = subPath + "/" -} - func (p *ParamTable) initGracefulTime() { p.GracefulTime = p.ParseInt64("queryNode.gracefulTime") } diff --git a/internal/querynode/partition.go b/internal/querynode/partition.go index 5b835b8616..1b5279f113 100644 --- a/internal/querynode/partition.go +++ b/internal/querynode/partition.go @@ -16,7 +16,6 @@ type Partition struct { partitionTag string id UniqueID segments []*Segment - enableDM bool } func (p *Partition) ID() UniqueID { @@ -34,7 +33,6 @@ func (p *Partition) Segments() *[]*Segment { func newPartition2(partitionTag string) *Partition { var newPartition = &Partition{ partitionTag: partitionTag, - enableDM: false, } return newPartition @@ -42,8 +40,7 @@ func newPartition2(partitionTag string) *Partition { func newPartition(partitionID UniqueID) *Partition { var newPartition = &Partition{ - id: partitionID, - enableDM: false, + id: partitionID, } return newPartition diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 7b8270877b..72f583cd30 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -136,7 +136,7 @@ func (node *QueryNode) Start() error { node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) node.loadIndexService = newLoadIndexService(node.queryNodeLoopCtx, node.replica) node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadIndexService.fieldStatsChan) - node.segManager = newSegmentManager(node.queryNodeLoopCtx, node.replica, node.dataSyncService.dmStream, node.loadIndexService.loadIndexReqChan) + node.segManager = newSegmentManager(node.queryNodeLoopCtx, node.replica, node.loadIndexService.loadIndexReqChan) // start services go node.dataSyncService.start() @@ -344,31 +344,14 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S segmentIDs := in.SegmentIDs fieldIDs := in.FieldIDs - err := node.replica.enablePartitionDM(collectionID, partitionID) - if err != nil { - status := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: err.Error(), - } - return status, err - } - // segments are ordered before LoadSegments calling if in.LastSegmentState.State == datapb.SegmentState_SegmentGrowing { segmentNum := len(segmentIDs) - positions := in.LastSegmentState.StartPositions - err = node.segManager.seekSegment(positions) - if err != nil { - status := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: err.Error(), - } - return status, err - } + node.segManager.seekSegment(segmentIDs[segmentNum-1]) segmentIDs = segmentIDs[:segmentNum-1] } - err = node.segManager.loadSegment(collectionID, partitionID, segmentIDs, fieldIDs) + err := node.segManager.loadSegment(collectionID, partitionID, segmentIDs, fieldIDs) if err != nil { status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, @@ -380,17 +363,6 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S } func (node *QueryNode) ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error) { - for _, id := range in.PartitionIDs { - err := node.replica.enablePartitionDM(in.CollectionID, id) - if err != nil { - status := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: err.Error(), - } - return status, err - } - } - // release all fields in the segments for _, id := range in.SegmentIDs { err := node.segManager.releaseSegment(id) diff --git a/internal/querynode/segment_manager.go b/internal/querynode/segment_manager.go index 0a189bbde9..72c880976b 100644 --- a/internal/querynode/segment_manager.go +++ b/internal/querynode/segment_manager.go @@ -5,7 +5,6 @@ import ( "errors" "strconv" - indexnodeclient "github.com/zilliztech/milvus-distributed/internal/indexnode/client" "github.com/zilliztech/milvus-distributed/internal/kv" miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio" "github.com/zilliztech/milvus-distributed/internal/msgstream" @@ -14,31 +13,52 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/indexpb" internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/storage" - writerclient "github.com/zilliztech/milvus-distributed/internal/writenode/client" ) type segmentManager struct { replica collectionReplica - dmStream msgstream.MsgStream loadIndexReqChan chan []msgstream.TsMsg - dataClient *writerclient.Client - indexClient *indexnodeclient.Client + // TODO: replace by client instead of grpc client + dataClient datapb.DataServiceClient + indexBuilderClient indexpb.IndexServiceClient kv kv.Base // minio kv iCodec *storage.InsertCodec } -func (s *segmentManager) seekSegment(positions []*internalPb.MsgPosition) error { - // TODO: open seek - //for _, position := range positions { - // err := s.dmStream.Seek(position) - // if err != nil { - // return err - // } - //} - return nil +func newSegmentManager(ctx context.Context, replica collectionReplica, loadIndexReqChan chan []msgstream.TsMsg) *segmentManager { + bucketName := Params.MinioBucketName + option := &miniokv.Option{ + Address: Params.MinioEndPoint, + AccessKeyID: Params.MinioAccessKeyID, + SecretAccessKeyID: Params.MinioSecretAccessKey, + UseSSL: Params.MinioUseSSLStr, + BucketName: bucketName, + CreateBucket: true, + } + + minioKV, err := miniokv.NewMinIOKV(ctx, option) + if err != nil { + panic(err) + } + + return &segmentManager{ + replica: replica, + loadIndexReqChan: loadIndexReqChan, + + // TODO: init clients + dataClient: nil, + indexBuilderClient: nil, + + kv: minioKV, + iCodec: &storage.InsertCodec{}, + } +} + +func (s *segmentManager) seekSegment(segmentID UniqueID) { + // TODO: impl } func (s *segmentManager) loadSegment(collectionID UniqueID, partitionID UniqueID, segmentIDs []UniqueID, fieldIDs []int64) error { @@ -61,11 +81,7 @@ func (s *segmentManager) loadSegment(collectionID UniqueID, partitionID UniqueID } targetFields := s.filterOutNeedlessFields(paths, srcFieldIDs, fieldIDs) - // replace segment - err = s.replica.removeSegment(segmentID) - if err != nil { - return err - } + // create segment err = s.replica.addSegment(segmentID, partitionID, collectionID, segTypeSealed) if err != nil { return err @@ -102,25 +118,16 @@ func (s *segmentManager) getInsertBinlogPaths(segmentID UniqueID) ([]*internalPb SegmentID: segmentID, } - pathResponse, err := s.dataClient.GetInsertBinlogPaths(insertBinlogPathRequest.SegmentID) + pathResponse, err := s.dataClient.GetInsertBinlogPaths(context.TODO(), insertBinlogPathRequest) if err != nil { return nil, nil, err } - //if len(pathResponse.FieldIDs) != len(pathResponse.Paths) { - // return nil, nil, errors.New("illegal InsertBinlogPathsResponse") - //} - - fieldIDs := make([]int64, 0) - paths := make([]*internalPb.StringList, 0) - for k, v := range pathResponse { - fieldIDs = append(fieldIDs, k) - paths = append(paths, &internalPb.StringList{ - Values: v, - }) + if len(pathResponse.FieldIDs) != len(pathResponse.Paths) { + return nil, nil, errors.New("illegal InsertBinlogPathsResponse") } - return paths, fieldIDs, nil + return pathResponse.Paths, pathResponse.FieldIDs, nil } func (s *segmentManager) filterOutNeedlessFields(paths []*internalPb.StringList, srcFieldIDS []int64, dstFields []int64) map[int64]*internalPb.StringList { @@ -227,15 +234,12 @@ func (s *segmentManager) getIndexPaths(indexID UniqueID) ([]string, error) { indexFilePathRequest := &indexpb.IndexFilePathsRequest{ IndexIDs: []UniqueID{indexID}, } - pathResponse, err := s.indexClient.GetIndexFilePaths(indexFilePathRequest.IndexIDs) - //if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - // return nil, err - //} - if err != nil { + pathResponse, err := s.indexBuilderClient.GetIndexFilePaths(context.TODO(), indexFilePathRequest) + if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { return nil, err } - return pathResponse[0], nil + return pathResponse.FilePaths[0].IndexFilePaths, nil } func (s *segmentManager) getIndexParam() (indexParam, error) { @@ -289,42 +293,3 @@ func (s *segmentManager) sendLoadIndex(indexPaths []string, messages := []msgstream.TsMsg{loadIndexMsg} s.loadIndexReqChan <- messages } - -func newSegmentManager(ctx context.Context, replica collectionReplica, dmStream msgstream.MsgStream, loadIndexReqChan chan []msgstream.TsMsg) *segmentManager { - bucketName := Params.MinioBucketName - option := &miniokv.Option{ - Address: Params.MinioEndPoint, - AccessKeyID: Params.MinioAccessKeyID, - SecretAccessKeyID: Params.MinioSecretAccessKey, - UseSSL: Params.MinioUseSSLStr, - BucketName: bucketName, - CreateBucket: true, - } - - minioKV, err := miniokv.NewMinIOKV(ctx, option) - if err != nil { - panic(err) - } - - dataClient, err := writerclient.NewWriterClient(Params.ETCDAddress, Params.MetaRootPath, Params.WriteNodeSegKvSubPath, nil) - if err != nil { - panic(err) - } - - indexClient, err := indexnodeclient.NewBuildIndexClient(ctx, Params.IndexBuilderAddress) - if err != nil { - panic(err) - } - - return &segmentManager{ - replica: replica, - dmStream: dmStream, - loadIndexReqChan: loadIndexReqChan, - - dataClient: dataClient, - indexClient: indexClient, - - kv: minioKV, - iCodec: &storage.InsertCodec{}, - } -} diff --git a/internal/querynode/segment_manager_test.go b/internal/querynode/segment_manager_test.go index 9686e9f4b9..015703579e 100644 --- a/internal/querynode/segment_manager_test.go +++ b/internal/querynode/segment_manager_test.go @@ -16,9 +16,6 @@ import ( "github.com/zilliztech/milvus-distributed/internal/indexnode" minioKV "github.com/zilliztech/milvus-distributed/internal/kv/minio" - "github.com/zilliztech/milvus-distributed/internal/msgstream" - "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" - "github.com/zilliztech/milvus-distributed/internal/msgstream/util" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" @@ -26,7 +23,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/storage" ) -func generateInsertBinLog(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, keyPrefix string) ([]*internalPb.StringList, []int64, error) { +func generateInsertBinLog(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID) ([]*internalPb.StringList, []int64, error) { const ( msgLength = 1000 DIM = 16 @@ -111,8 +108,10 @@ func generateInsertBinLog(collectionID UniqueID, partitionID UniqueID, segmentID } // binLogs -> minIO/S3 + collIDStr := strconv.FormatInt(collectionID, 10) + partitionIDStr := strconv.FormatInt(partitionID, 10) segIDStr := strconv.FormatInt(segmentID, 10) - keyPrefix = path.Join(keyPrefix, segIDStr) + keyPrefix := path.Join("query-node-seg-manager-test-minio-prefix", collIDStr, partitionIDStr, segIDStr) paths := make([]*internalPb.StringList, 0) fieldIDs := make([]int64, 0) @@ -215,197 +214,18 @@ func generateIndex(segmentID UniqueID) ([]string, indexParam, error) { return indexPaths, indexParams, nil } -func doInsert(ctx context.Context, collectionName string, partitionTag string, segmentID UniqueID) error { - const msgLength = 1000 - const DIM = 16 - - var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} - var rawData []byte - for _, ele := range vec { - buf := make([]byte, 4) - binary.LittleEndian.PutUint32(buf, math.Float32bits(ele)) - rawData = append(rawData, buf...) - } - bs := make([]byte, 4) - binary.LittleEndian.PutUint32(bs, 1) - rawData = append(rawData, bs...) - - timeRange := TimeRange{ - timestampMin: 0, - timestampMax: math.MaxUint64, - } - - // messages generate - insertMessages := make([]msgstream.TsMsg, 0) - for i := 0; i < msgLength; i++ { - var msg msgstream.TsMsg = &msgstream.InsertMsg{ - BaseMsg: msgstream.BaseMsg{ - HashValues: []uint32{ - uint32(i), - }, - }, - InsertRequest: internalPb.InsertRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_kInsert, - MsgID: 0, - Timestamp: uint64(i + 1000), - SourceID: 0, - }, - CollectionName: collectionName, - PartitionName: partitionTag, - SegmentID: segmentID, - ChannelID: "0", - Timestamps: []uint64{uint64(i + 1000)}, - RowIDs: []int64{int64(i)}, - RowData: []*commonpb.Blob{ - {Value: rawData}, - }, - }, - } - insertMessages = append(insertMessages, msg) - } - - msgPack := msgstream.MsgPack{ - BeginTs: timeRange.timestampMin, - EndTs: timeRange.timestampMax, - Msgs: insertMessages, - } - - // generate timeTick - timeTickMsgPack := msgstream.MsgPack{} - baseMsg := msgstream.BaseMsg{ - BeginTimestamp: 1000, - EndTimestamp: 1500, - HashValues: []uint32{0}, - } - timeTickResult := internalPb.TimeTickMsg{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_kTimeTick, - MsgID: 0, - Timestamp: 1000, - SourceID: 0, - }, - } - timeTickMsg := &msgstream.TimeTickMsg{ - BaseMsg: baseMsg, - TimeTickMsg: timeTickResult, - } - timeTickMsgPack.Msgs = append(timeTickMsgPack.Msgs, timeTickMsg) - - // pulsar produce - const receiveBufSize = 1024 - insertChannels := Params.InsertChannelNames - ddChannels := Params.DDChannelNames - pulsarURL := Params.PulsarAddress - - insertStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize) - insertStream.SetPulsarClient(pulsarURL) - insertStream.CreatePulsarProducers(insertChannels) - unmarshalDispatcher := util.NewUnmarshalDispatcher() - insertStream.CreatePulsarConsumers(insertChannels, Params.MsgChannelSubName, unmarshalDispatcher, receiveBufSize) - - ddStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize) - ddStream.SetPulsarClient(pulsarURL) - ddStream.CreatePulsarProducers(ddChannels) - - var insertMsgStream msgstream.MsgStream = insertStream - insertMsgStream.Start() - - var ddMsgStream msgstream.MsgStream = ddStream - ddMsgStream.Start() - - err := insertMsgStream.Produce(&msgPack) - if err != nil { - return err - } - - err = insertMsgStream.Broadcast(&timeTickMsgPack) - if err != nil { - return err - } - err = ddMsgStream.Broadcast(&timeTickMsgPack) - if err != nil { - return err - } - - //messages := insertStream.Consume() - //for _, msg := range messages.Msgs { - // - //} - - return nil -} - -func sentTimeTick(ctx context.Context) error { - timeTickMsgPack := msgstream.MsgPack{} - baseMsg := msgstream.BaseMsg{ - BeginTimestamp: 1500, - EndTimestamp: 2000, - HashValues: []uint32{0}, - } - timeTickResult := internalPb.TimeTickMsg{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_kTimeTick, - MsgID: 0, - Timestamp: math.MaxUint64, - SourceID: 0, - }, - } - timeTickMsg := &msgstream.TimeTickMsg{ - BaseMsg: baseMsg, - TimeTickMsg: timeTickResult, - } - timeTickMsgPack.Msgs = append(timeTickMsgPack.Msgs, timeTickMsg) - - // pulsar produce - const receiveBufSize = 1024 - insertChannels := Params.InsertChannelNames - ddChannels := Params.DDChannelNames - pulsarURL := Params.PulsarAddress - - insertStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize) - insertStream.SetPulsarClient(pulsarURL) - insertStream.CreatePulsarProducers(insertChannels) - unmarshalDispatcher := util.NewUnmarshalDispatcher() - insertStream.CreatePulsarConsumers(insertChannels, Params.MsgChannelSubName, unmarshalDispatcher, receiveBufSize) - - ddStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize) - ddStream.SetPulsarClient(pulsarURL) - ddStream.CreatePulsarProducers(ddChannels) - - var insertMsgStream msgstream.MsgStream = insertStream - insertMsgStream.Start() - - var ddMsgStream msgstream.MsgStream = ddStream - ddMsgStream.Start() - - err := insertMsgStream.Broadcast(&timeTickMsgPack) - if err != nil { - return err - } - err = ddMsgStream.Broadcast(&timeTickMsgPack) - if err != nil { - return err - } - return nil -} - func TestSegmentManager_load_release_and_search(t *testing.T) { collectionID := UniqueID(0) partitionID := UniqueID(1) segmentID := UniqueID(2) fieldIDs := []int64{0, 101} - // mock write insert bin log - keyPrefix := path.Join("query-node-seg-manager-test-minio-prefix", strconv.FormatInt(collectionID, 10), strconv.FormatInt(partitionID, 10)) - Params.WriteNodeSegKvSubPath = keyPrefix - node := newQueryNodeMock() defer node.Stop() ctx := node.queryNodeLoopCtx node.loadIndexService = newLoadIndexService(ctx, node.replica) - node.segManager = newSegmentManager(ctx, node.replica, nil, node.loadIndexService.loadIndexReqChan) + node.segManager = newSegmentManager(ctx, node.replica, node.loadIndexService.loadIndexReqChan) go node.loadIndexService.start() collectionName := "collection0" @@ -417,7 +237,7 @@ func TestSegmentManager_load_release_and_search(t *testing.T) { err = node.replica.addSegment(segmentID, partitionID, collectionID, segTypeSealed) assert.NoError(t, err) - paths, srcFieldIDs, err := generateInsertBinLog(collectionID, partitionID, segmentID, keyPrefix) + paths, srcFieldIDs, err := generateInsertBinLog(collectionID, partitionID, segmentID) assert.NoError(t, err) fieldsMap := node.segManager.filterOutNeedlessFields(paths, srcFieldIDs, fieldIDs) @@ -479,111 +299,3 @@ func TestSegmentManager_load_release_and_search(t *testing.T) { <-ctx.Done() } - -//// NOTE: start pulsar before test -//func TestSegmentManager_with_seek(t *testing.T) { -// collectionID := UniqueID(0) -// partitionID := UniqueID(1) -// //segmentID := UniqueID(2) -// fieldIDs := []int64{0, 101} -// -// //// mock write insert bin log -// //keyPrefix := path.Join("query-node-seg-manager-test-minio-prefix", strconv.FormatInt(collectionID, 10), strconv.FormatInt(partitionID, 10)) -// //Params.WriteNodeSegKvSubPath = keyPrefix + "/" -// node := newQueryNodeMock() -// -// ctx := node.queryNodeLoopCtx -// go node.Start() -// -// collectionName := "collection0" -// initTestMeta(t, node, collectionName, collectionID, 0) -// -// err := node.replica.addPartition(collectionID, partitionID) -// assert.NoError(t, err) -// -// //err = node.replica.addSegment(segmentID, partitionID, collectionID, segTypeSealed) -// //assert.NoError(t, err) -// -// //paths, srcFieldIDs, err := generateInsertBinLog(collectionID, partitionID, segmentID, keyPrefix) -// //assert.NoError(t, err) -// -// //fieldsMap := node.segManager.filterOutNeedlessFields(paths, srcFieldIDs, fieldIDs) -// //assert.Equal(t, len(fieldsMap), 2) -// -// segmentIDToInsert := UniqueID(3) -// err = doInsert(ctx, collectionName, "default", segmentIDToInsert) -// assert.NoError(t, err) -// -// startPositions := make([]*internalPb.MsgPosition, 0) -// for _, ch := range Params.InsertChannelNames { -// startPositions = append(startPositions, &internalPb.MsgPosition{ -// ChannelName: ch, -// }) -// } -// var positions []*internalPb.MsgPosition -// lastSegStates := &datapb.SegmentStatesResponse{ -// State: datapb.SegmentState_SegmentGrowing, -// StartPositions: positions, -// } -// loadReq := &querypb.LoadSegmentRequest{ -// CollectionID: collectionID, -// PartitionID: partitionID, -// SegmentIDs: []UniqueID{segmentIDToInsert}, -// FieldIDs: fieldIDs, -// LastSegmentState: lastSegStates, -// } -// _, err = node.LoadSegments(loadReq) -// assert.NoError(t, err) -// -// err = sentTimeTick(ctx) -// assert.NoError(t, err) -// -// // do search -// dslString := "{\"bool\": { \n\"vector\": {\n \"vec\": {\n \"metric_type\": \"L2\", \n \"params\": {\n \"nprobe\": 10 \n},\n \"query\": \"$0\",\"topk\": 10 \n } \n } \n } \n }" -// -// const DIM = 16 -// var searchRawData []byte -// var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} -// for _, ele := range vec { -// buf := make([]byte, 4) -// binary.LittleEndian.PutUint32(buf, math.Float32bits(ele)) -// searchRawData = append(searchRawData, buf...) -// } -// placeholderValue := milvuspb.PlaceholderValue{ -// Tag: "$0", -// Type: milvuspb.PlaceholderType_VECTOR_FLOAT, -// Values: [][]byte{searchRawData}, -// } -// -// placeholderGroup := milvuspb.PlaceholderGroup{ -// Placeholders: []*milvuspb.PlaceholderValue{&placeholderValue}, -// } -// -// placeHolderGroupBlob, err := proto.Marshal(&placeholderGroup) -// assert.NoError(t, err) -// -// //searchTimestamp := Timestamp(1020) -// collection, err := node.replica.getCollectionByID(collectionID) -// assert.NoError(t, err) -// plan, err := createPlan(*collection, dslString) -// assert.NoError(t, err) -// holder, err := parserPlaceholderGroup(plan, placeHolderGroupBlob) -// assert.NoError(t, err) -// placeholderGroups := make([]*PlaceholderGroup, 0) -// placeholderGroups = append(placeholderGroups, holder) -// -// // wait for segment building index -// time.Sleep(3 * time.Second) -// -// //segment, err := node.replica.getSegmentByID(segmentIDToInsert) -// //assert.NoError(t, err) -// //_, err = segment.segmentSearch(plan, placeholderGroups, []Timestamp{searchTimestamp}) -// //assert.Nil(t, err) -// -// plan.delete() -// holder.delete() -// -// <-ctx.Done() -// err = node.Stop() -// assert.NoError(t, err) -//} diff --git a/internal/util/rocksmq/global_allocator.go b/internal/util/rocksmq/global_allocator.go deleted file mode 100644 index d105003df0..0000000000 --- a/internal/util/rocksmq/global_allocator.go +++ /dev/null @@ -1,167 +0,0 @@ -package rocksmq - -import ( - "errors" - "log" - "sync/atomic" - "time" - - "github.com/zilliztech/milvus-distributed/internal/kv" - "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" - "go.uber.org/zap" -) - -// Allocator is a Timestamp Oracle allocator. -type Allocator interface { - // Initialize is used to initialize a TSO allocator. - // It will synchronize TSO with etcd and initialize the - // memory for later allocation work. - Initialize() error - // UpdateTSO is used to update the TSO in memory and the time window in etcd. - UpdateTSO() error - // SetTSO sets the physical part with given tso. It's mainly used for BR restore - // and can not forcibly set the TSO smaller than now. - SetTSO(tso uint64) error - // GenerateTSO is used to generate a given number of TSOs. - // Make sure you have initialized the TSO allocator before calling. - GenerateTSO(count uint32) (uint64, error) - // Reset is used to reset the TSO allocator. - Reset() -} - -// GlobalTSOAllocator is the global single point TSO allocator. -type GlobalTSOAllocator struct { - tso *timestampOracle -} - -// NewGlobalTSOAllocator creates a new global TSO allocator. -func NewGlobalTSOAllocator(key string, kvBase kv.TxnBase) *GlobalTSOAllocator { - var saveInterval = 3 * time.Second - return &GlobalTSOAllocator{ - tso: ×tampOracle{ - kvBase: kvBase, - saveInterval: saveInterval, - maxResetTSGap: func() time.Duration { return 3 * time.Second }, - key: key, - }, - } -} - -// Initialize will initialize the created global TSO allocator. -func (gta *GlobalTSOAllocator) Initialize() error { - return gta.tso.InitTimestamp() -} - -// UpdateTSO is used to update the TSO in memory and the time window in etcd. -func (gta *GlobalTSOAllocator) UpdateTSO() error { - return gta.tso.UpdateTimestamp() -} - -// SetTSO sets the physical part with given tso. -func (gta *GlobalTSOAllocator) SetTSO(tso uint64) error { - return gta.tso.ResetUserTimestamp(tso) -} - -// GenerateTSO is used to generate a given number of TSOs. -// Make sure you have initialized the TSO allocator before calling. -func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (uint64, error) { - var physical, logical int64 - if count == 0 { - return 0, errors.New("tso count should be positive") - } - - maxRetryCount := 10 - - for i := 0; i < maxRetryCount; i++ { - current := (*atomicObject)(atomic.LoadPointer(>a.tso.TSO)) - if current == nil || current.physical.Equal(typeutil.ZeroTime) { - // If it's leader, maybe SyncTimestamp hasn't completed yet - log.Println("sync hasn't completed yet, wait for a while") - time.Sleep(200 * time.Millisecond) - continue - } - - physical = current.physical.UnixNano() / int64(time.Millisecond) - logical = atomic.AddInt64(¤t.logical, int64(count)) - if logical >= maxLogical { - log.Println("logical part outside of max logical interval, please check ntp time", - zap.Int("retry-count", i)) - time.Sleep(UpdateTimestampStep) - continue - } - return tsoutil.ComposeTS(physical, logical), nil - } - return 0, errors.New("can not get timestamp") -} - -func (gta *GlobalTSOAllocator) Alloc(count uint32) (typeutil.Timestamp, error) { - //return gta.tso.SyncTimestamp() - start, err := gta.GenerateTSO(count) - if err != nil { - return typeutil.ZeroTimestamp, err - } - //ret := make([]typeutil.Timestamp, count) - //for i:=uint32(0); i < count; i++{ - // ret[i] = start + uint64(i) - //} - return start, err -} - -func (gta *GlobalTSOAllocator) AllocOne() (typeutil.Timestamp, error) { - return gta.GenerateTSO(1) -} - -// Reset is used to reset the TSO allocator. -func (gta *GlobalTSOAllocator) Reset() { - gta.tso.ResetTimestamp() -} - -/////////////////////////////////////////////////////////////////////// - -type IDAllocator interface { - Alloc(count uint32) (UniqueID, UniqueID, error) - AllocOne() (UniqueID, error) - UpdateID() error -} - -// GlobalTSOAllocator is the global single point TSO allocator. -type GlobalIDAllocator struct { - allocator Allocator -} - -func NewGlobalIDAllocator(key string, base kv.TxnBase) *GlobalIDAllocator { - return &GlobalIDAllocator{ - allocator: NewGlobalTSOAllocator(key, base), - } -} - -// Initialize will initialize the created global TSO allocator. -func (gia *GlobalIDAllocator) Initialize() error { - return gia.allocator.Initialize() -} - -// GenerateTSO is used to generate a given number of TSOs. -// Make sure you have initialized the TSO allocator before calling. -func (gia *GlobalIDAllocator) Alloc(count uint32) (UniqueID, UniqueID, error) { - timestamp, err := gia.allocator.GenerateTSO(count) - if err != nil { - return 0, 0, err - } - idStart := UniqueID(timestamp) - idEnd := idStart + int64(count) - return idStart, idEnd, nil -} - -func (gia *GlobalIDAllocator) AllocOne() (UniqueID, error) { - timestamp, err := gia.allocator.GenerateTSO(1) - if err != nil { - return 0, err - } - idStart := UniqueID(timestamp) - return idStart, nil -} - -func (gia *GlobalIDAllocator) UpdateID() error { - return gia.allocator.UpdateTSO() -} diff --git a/internal/util/rocksmq/global_rmq.go b/internal/util/rocksmq/global_rmq.go deleted file mode 100644 index a32e49dd44..0000000000 --- a/internal/util/rocksmq/global_rmq.go +++ /dev/null @@ -1,13 +0,0 @@ -package rocksmq - -var rmq *RocksMQ - -func InitRmq(rocksdbName string, idAllocator IDAllocator) error { - var err error - rmq, err = NewRocksMQ(rocksdbName, idAllocator) - return err -} - -func GetRmq() *RocksMQ { - return rmq -} diff --git a/internal/util/rocksmq/rocksmq.go b/internal/util/rocksmq/rocksmq.go index e5e5309131..42a92ee99f 100644 --- a/internal/util/rocksmq/rocksmq.go +++ b/internal/util/rocksmq/rocksmq.go @@ -7,6 +7,7 @@ import ( "github.com/tecbot/gorocksdb" "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/kv" + "github.com/zilliztech/milvus-distributed/internal/master" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" memkv "github.com/zilliztech/milvus-distributed/internal/kv/mem" @@ -72,7 +73,7 @@ type RocksMQ struct { kv kv.Base channels map[string]*Channel cgCtxs map[string]ConsumerGroupContext - idAllocator IDAllocator + idAllocator master.IDAllocator produceMu sync.Mutex consumeMu sync.Mutex //ctx context.Context @@ -84,7 +85,7 @@ type RocksMQ struct { //tsoTicker *time.Ticker } -func NewRocksMQ(name string, idAllocator IDAllocator) (*RocksMQ, error) { +func NewRocksMQ(name string, idAllocator master.IDAllocator) (*RocksMQ, error) { bbto := gorocksdb.NewDefaultBlockBasedTableOptions() bbto.SetBlockCache(gorocksdb.NewLRUCache(RocksDBLRUCacheCapacity)) opts := gorocksdb.NewDefaultOptions() diff --git a/internal/util/rocksmq/rocksmq_test.go b/internal/util/rocksmq/rocksmq_test.go index 74bd9d2a8c..1ded31043c 100644 --- a/internal/util/rocksmq/rocksmq_test.go +++ b/internal/util/rocksmq/rocksmq_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" + master "github.com/zilliztech/milvus-distributed/internal/master" "go.etcd.io/etcd/clientv3" ) @@ -19,15 +20,14 @@ func TestFixChannelName(t *testing.T) { } func TestRocksMQ(t *testing.T) { - etcdAddr := os.Getenv("ETCD_ADDRESS") - if etcdAddr == "" { - etcdAddr = "localhost:2379" - } + master.Init() + + etcdAddr := master.Params.EtcdAddress cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) assert.Nil(t, err) etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root") defer etcdKV.Close() - idAllocator := NewGlobalIDAllocator("dummy", etcdKV) + idAllocator := master.NewGlobalIDAllocator("dummy", etcdKV) _ = idAllocator.Initialize() name := "/tmp/rocksmq" @@ -76,15 +76,14 @@ func TestRocksMQ(t *testing.T) { } func TestRocksMQ_Loop(t *testing.T) { - etcdAddr := os.Getenv("ETCD_ADDRESS") - if etcdAddr == "" { - etcdAddr = "localhost:2379" - } + master.Init() + + etcdAddr := master.Params.EtcdAddress cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) assert.Nil(t, err) etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root") defer etcdKV.Close() - idAllocator := NewGlobalIDAllocator("dummy", etcdKV) + idAllocator := master.NewGlobalIDAllocator("dummy", etcdKV) _ = idAllocator.Initialize() name := "/tmp/rocksmq_1" @@ -144,15 +143,14 @@ func TestRocksMQ_Loop(t *testing.T) { } func TestRocksMQ_Goroutines(t *testing.T) { - etcdAddr := os.Getenv("ETCD_ADDRESS") - if etcdAddr == "" { - etcdAddr = "localhost:2379" - } + master.Init() + + etcdAddr := master.Params.EtcdAddress cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) assert.Nil(t, err) etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root") defer etcdKV.Close() - idAllocator := NewGlobalIDAllocator("dummy", etcdKV) + idAllocator := master.NewGlobalIDAllocator("dummy", etcdKV) _ = idAllocator.Initialize() name := "/tmp/rocksmq_2" diff --git a/internal/util/rocksmq/tso.go b/internal/util/rocksmq/tso.go deleted file mode 100644 index 0db2901b83..0000000000 --- a/internal/util/rocksmq/tso.go +++ /dev/null @@ -1,202 +0,0 @@ -// Copyright 2016 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package rocksmq - -import ( - "log" - "sync/atomic" - "time" - "unsafe" - - "go.uber.org/zap" - - "github.com/zilliztech/milvus-distributed/internal/errors" - "github.com/zilliztech/milvus-distributed/internal/kv" - "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" -) - -const ( - // UpdateTimestampStep is used to update timestamp. - UpdateTimestampStep = 50 * time.Millisecond - // updateTimestampGuard is the min timestamp interval. - updateTimestampGuard = time.Millisecond - // maxLogical is the max upper limit for logical time. - // When a TSO's logical time reaches this limit, - // the physical time will be forced to increase. - maxLogical = int64(1 << 18) -) - -// atomicObject is used to store the current TSO in memory. -type atomicObject struct { - physical time.Time - logical int64 -} - -// timestampOracle is used to maintain the logic of tso. -type timestampOracle struct { - key string - kvBase kv.TxnBase - - // TODO: remove saveInterval - saveInterval time.Duration - maxResetTSGap func() time.Duration - // For tso, set after the PD becomes a leader. - TSO unsafe.Pointer - lastSavedTime atomic.Value -} - -func (t *timestampOracle) loadTimestamp() (time.Time, error) { - strData, err := t.kvBase.Load(t.key) - - var binData []byte = []byte(strData) - - if err != nil { - return typeutil.ZeroTime, err - } - if len(binData) == 0 { - return typeutil.ZeroTime, nil - } - return typeutil.ParseTimestamp(binData) -} - -// save timestamp, if lastTs is 0, we think the timestamp doesn't exist, so create it, -// otherwise, update it. -func (t *timestampOracle) saveTimestamp(ts time.Time) error { - data := typeutil.Uint64ToBytes(uint64(ts.UnixNano())) - err := t.kvBase.Save(t.key, string(data)) - if err != nil { - return errors.WithStack(err) - } - t.lastSavedTime.Store(ts) - return nil -} - -func (t *timestampOracle) InitTimestamp() error { - - //last, err := t.loadTimestamp() - //if err != nil { - // return err - //} - - next := time.Now() - - // If the current system time minus the saved etcd timestamp is less than `updateTimestampGuard`, - // the timestamp allocation will start from the saved etcd timestamp temporarily. - //if typeutil.SubTimeByWallClock(next, last) < updateTimestampGuard { - // next = last.Add(updateTimestampGuard) - //} - - save := next.Add(t.saveInterval) - if err := t.saveTimestamp(save); err != nil { - return err - } - - //log.Print("sync and save timestamp", zap.Time("last", last), zap.Time("save", save), zap.Time("next", next)) - - current := &atomicObject{ - physical: next, - } - atomic.StorePointer(&t.TSO, unsafe.Pointer(current)) - - return nil -} - -// ResetUserTimestamp update the physical part with specified tso. -func (t *timestampOracle) ResetUserTimestamp(tso uint64) error { - physical, _ := tsoutil.ParseTS(tso) - next := physical.Add(time.Millisecond) - prev := (*atomicObject)(atomic.LoadPointer(&t.TSO)) - - // do not update - if typeutil.SubTimeByWallClock(next, prev.physical) <= 3*updateTimestampGuard { - return errors.New("the specified ts too small than now") - } - - if typeutil.SubTimeByWallClock(next, prev.physical) >= t.maxResetTSGap() { - return errors.New("the specified ts too large than now") - } - - save := next.Add(t.saveInterval) - if err := t.saveTimestamp(save); err != nil { - return err - } - update := &atomicObject{ - physical: next, - } - atomic.CompareAndSwapPointer(&t.TSO, unsafe.Pointer(prev), unsafe.Pointer(update)) - return nil -} - -// UpdateTimestamp is used to update the timestamp. -// This function will do two things: -// 1. When the logical time is going to be used up, increase the current physical time. -// 2. When the time window is not big enough, which means the saved etcd time minus the next physical time -// will be less than or equal to `updateTimestampGuard`, then the time window needs to be updated and -// we also need to save the next physical time plus `TsoSaveInterval` into etcd. -// -// Here is some constraints that this function must satisfy: -// 1. The saved time is monotonically increasing. -// 2. The physical time is monotonically increasing. -// 3. The physical time is always less than the saved timestamp. -func (t *timestampOracle) UpdateTimestamp() error { - prev := (*atomicObject)(atomic.LoadPointer(&t.TSO)) - now := time.Now() - - jetLag := typeutil.SubTimeByWallClock(now, prev.physical) - if jetLag > 3*UpdateTimestampStep { - log.Print("clock offset", zap.Duration("jet-lag", jetLag), zap.Time("prev-physical", prev.physical), zap.Time("now", now)) - } - - var next time.Time - prevLogical := atomic.LoadInt64(&prev.logical) - // If the system time is greater, it will be synchronized with the system time. - if jetLag > updateTimestampGuard { - next = now - } else if prevLogical > maxLogical/2 { - // The reason choosing maxLogical/2 here is that it's big enough for common cases. - // Because there is enough timestamp can be allocated before next update. - log.Print("the logical time may be not enough", zap.Int64("prev-logical", prevLogical)) - next = prev.physical.Add(time.Millisecond) - } else { - // It will still use the previous physical time to alloc the timestamp. - return nil - } - - // It is not safe to increase the physical time to `next`. - // The time window needs to be updated and saved to etcd. - if typeutil.SubTimeByWallClock(t.lastSavedTime.Load().(time.Time), next) <= updateTimestampGuard { - save := next.Add(t.saveInterval) - if err := t.saveTimestamp(save); err != nil { - return err - } - } - - current := &atomicObject{ - physical: next, - logical: 0, - } - - atomic.StorePointer(&t.TSO, unsafe.Pointer(current)) - - return nil -} - -// ResetTimestamp is used to reset the timestamp. -func (t *timestampOracle) ResetTimestamp() { - zero := &atomicObject{ - physical: time.Now(), - } - atomic.StorePointer(&t.TSO, unsafe.Pointer(zero)) -} diff --git a/tools/core_gen/all_generate.py b/tools/core_gen/all_generate.py index a4be00cead..9d4685a4c5 100755 --- a/tools/core_gen/all_generate.py +++ b/tools/core_gen/all_generate.py @@ -62,6 +62,10 @@ if __name__ == "__main__": 'visitor_name': "VerifyExprVisitor", "parameter_name": 'expr', }, + { + 'visitor_name': "ExtractInfoExprVisitor", + "parameter_name": 'expr', + }, ], 'PlanNode': [ { @@ -76,6 +80,10 @@ if __name__ == "__main__": 'visitor_name': "VerifyPlanNodeVisitor", "parameter_name": 'node', }, + { + 'visitor_name': "ExtractInfoPlanNodeVisitor", + "parameter_name": 'node', + }, ] } extract_extra_body(visitor_info, query_path)