diff --git a/.jenkins/modules/Build/Build.groovy b/.jenkins/modules/Build/Build.groovy index bf0958c7cd..dc5e5db542 100644 --- a/.jenkins/modules/Build/Build.groovy +++ b/.jenkins/modules/Build/Build.groovy @@ -1,6 +1,7 @@ timeout(time: 10, unit: 'MINUTES') { dir ("scripts") { - sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./check_cache.sh -l $CCACHE_ARTFACTORY_URL --cache_dir=\$CCACHE_DIR -f ccache-\$OS_NAME-\$BUILD_ENV_IMAGE_ID.tar.gz || echo \"ccache files not found!\"' + sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./check_cache.sh -l $CCACHE_ARTFACTORY_URL --cache_dir=\$CCACHE_DIR -f ccache-\$OS_NAME-\$BUILD_ENV_IMAGE_ID.tar.gz || echo \"ccache artfactory files not found!\"' + sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./check_cache.sh -l $THIRDPARTY_ARTFACTORY_URL --cache_dir=$CUSTOM_THIRDPARTY_PATH -f thirdparty-download.tar.gz || echo \"thirdparty artfactory files not found!\"' } sh '. ./scripts/before-install.sh && make install' @@ -8,6 +9,7 @@ timeout(time: 10, unit: 'MINUTES') { dir ("scripts") { withCredentials([usernamePassword(credentialsId: "${env.JFROG_CREDENTIALS_ID}", usernameVariable: 'USERNAME', passwordVariable: 'PASSWORD')]) { sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./update_cache.sh -l $CCACHE_ARTFACTORY_URL --cache_dir=\$CCACHE_DIR -f ccache-\$OS_NAME-\$BUILD_ENV_IMAGE_ID.tar.gz -u ${USERNAME} -p ${PASSWORD}' + sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./update_cache.sh -l $THIRDPARTY_ARTFACTORY_URL --cache_dir=$CUSTOM_THIRDPARTY_PATH -f thirdparty-download.tar.gz -u ${USERNAME} -p ${PASSWORD}' } } } diff --git a/Makefile b/Makefile index e30ba03587..75232a2044 100644 --- a/Makefile +++ b/Makefile @@ -64,12 +64,12 @@ build-go: @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/querynode $(PWD)/cmd/querynode/query_node.go 1>/dev/null build-cpp: - @(env bash $(PWD)/scripts/core_build.sh) - @(env bash $(PWD)/scripts/cwrapper_build.sh -t Release) + @(env bash $(PWD)/scripts/core_build.sh -f "$(CUSTOM_THIRDPARTY_PATH)") + @(env bash $(PWD)/scripts/cwrapper_build.sh -t Release -f "$(CUSTOM_THIRDPARTY_PATH)") build-cpp-with-unittest: - @(env bash $(PWD)/scripts/core_build.sh -u) - @(env bash $(PWD)/scripts/cwrapper_build.sh -t Release) + @(env bash $(PWD)/scripts/core_build.sh -u -f "$(CUSTOM_THIRDPARTY_PATH)") + @(env bash $(PWD)/scripts/cwrapper_build.sh -t Release -f "$(CUSTOM_THIRDPARTY_PATH)") # Runs the tests. unittest: test-cpp test-go diff --git a/build/ci/jenkins/Jenkinsfile b/build/ci/jenkins/Jenkinsfile index 17c3ce39a7..ed21ffee9e 100644 --- a/build/ci/jenkins/Jenkinsfile +++ b/build/ci/jenkins/Jenkinsfile @@ -38,6 +38,8 @@ pipeline { PULSAR_ADDRESS = "pulsar://127.0.0.1:6650" ETCD_ADDRESS = "127.0.0.1:2379" CCACHE_ARTFACTORY_URL = "${JFROG_ARTFACTORY_URL}/milvus-distributed/ccache" + THIRDPARTY_ARTFACTORY_URL = "${JFROG_ARTFACTORY_URL}/milvus-distributed/thirdparty" + CUSTOM_THIRDPARTY_PATH = "${WORKSPACE}/3rdparty_download" } steps { container('build-env') { diff --git a/internal/core/src/common/Types.cpp b/internal/core/src/common/Types.cpp index 0e4aa7d35c..4854fba03e 100644 --- a/internal/core/src/common/Types.cpp +++ b/internal/core/src/common/Types.cpp @@ -38,7 +38,7 @@ static auto map = [] { MetricType GetMetricType(const std::string& type_name) { auto real_name = to_lower_copy(type_name); - AssertInfo(map.left.count(real_name), "metric type not found: (" + type_name + ")"); + AssertInfo(map.left.count(real_name), "metric type not found: " + type_name); return map.left.at(real_name); } diff --git a/internal/core/src/common/Types.h b/internal/core/src/common/Types.h index c46ee76c65..c5c94dcf5e 100644 --- a/internal/core/src/common/Types.h +++ b/internal/core/src/common/Types.h @@ -13,8 +13,6 @@ #include "utils/Types.h" #include #include -#include -#include namespace milvus { using Timestamp = uint64_t; // TODO: use TiKV-like timestamp @@ -26,15 +24,4 @@ using MetricType = faiss::MetricType; faiss::MetricType GetMetricType(const std::string& type); -// NOTE: dependent type -// used at meta-template programming -template -constexpr std::true_type always_true{}; - -template -constexpr std::false_type always_false{}; - -template -using aligned_vector = std::vector>; - } // namespace milvus diff --git a/internal/core/src/query/Plan.cpp b/internal/core/src/query/Plan.cpp index 8e2a89cfb0..866807e551 100644 --- a/internal/core/src/query/Plan.cpp +++ b/internal/core/src/query/Plan.cpp @@ -70,6 +70,8 @@ to_lower(const std::string& raw) { return data; } +template +constexpr std::false_type always_false{}; template std::unique_ptr ParseRangeNodeImpl(const Schema& schema, const std::string& field_name, const Json& body) { @@ -83,62 +85,31 @@ ParseRangeNodeImpl(const Schema& schema, const std::string& field_name, const Js AssertInfo(RangeExpr::mapping_.count(op_name), "op(" + op_name + ") not found"); auto op = RangeExpr::mapping_.at(op_name); - if constexpr (std::is_same_v) { - Assert(item.value().is_boolean()); - } else if constexpr (std::is_integral_v) { + if constexpr (std::is_integral_v) { Assert(item.value().is_number_integer()); } else if constexpr (std::is_floating_point_v) { Assert(item.value().is_number()); } else { static_assert(always_false, "unsupported type"); - __builtin_unreachable(); } T value = item.value(); expr->conditions_.emplace_back(op, value); } - std::sort(expr->conditions_.begin(), expr->conditions_.end()); - return expr; -} - -template -std::unique_ptr -ParseTermNodeImpl(const Schema& schema, const std::string& field_name, const Json& body) { - auto expr = std::make_unique>(); - auto data_type = schema[field_name].get_data_type(); - Assert(body.is_array()); - expr->field_id_ = field_name; - expr->data_type_ = data_type; - for (auto& value : body) { - if constexpr (std::is_same_v) { - Assert(value.is_boolean()); - } else if constexpr (std::is_integral_v) { - Assert(value.is_number_integer()); - } else if constexpr (std::is_floating_point_v) { - Assert(value.is_number()); - } else { - static_assert(always_false, "unsupported type"); - __builtin_unreachable(); - } - T real_value = value; - expr->terms_.push_back(real_value); - } - std::sort(expr->terms_.begin(), expr->terms_.end()); return expr; } std::unique_ptr ParseRangeNode(const Schema& schema, const Json& out_body) { - Assert(out_body.is_object()); Assert(out_body.size() == 1); auto out_iter = out_body.begin(); auto field_name = out_iter.key(); auto body = out_iter.value(); auto data_type = schema[field_name].get_data_type(); Assert(!field_is_vector(data_type)); - switch (data_type) { case DataType::BOOL: { - return ParseRangeNodeImpl(schema, field_name, body); + PanicInfo("bool is not supported in Range node"); + // return ParseRangeNodeImpl(schema, field_name, body); } case DataType::INT8: return ParseRangeNodeImpl(schema, field_name, body); @@ -157,42 +128,6 @@ ParseRangeNode(const Schema& schema, const Json& out_body) { } } -static std::unique_ptr -ParseTermNode(const Schema& schema, const Json& out_body) { - Assert(out_body.size() == 1); - auto out_iter = out_body.begin(); - auto field_name = out_iter.key(); - auto body = out_iter.value(); - auto data_type = schema[field_name].get_data_type(); - Assert(!field_is_vector(data_type)); - switch (data_type) { - case DataType::BOOL: { - return ParseTermNodeImpl(schema, field_name, body); - } - case DataType::INT8: { - return ParseTermNodeImpl(schema, field_name, body); - } - case DataType::INT16: { - return ParseTermNodeImpl(schema, field_name, body); - } - case DataType::INT32: { - return ParseTermNodeImpl(schema, field_name, body); - } - case DataType::INT64: { - return ParseTermNodeImpl(schema, field_name, body); - } - case DataType::FLOAT: { - return ParseTermNodeImpl(schema, field_name, body); - } - case DataType::DOUBLE: { - return ParseTermNodeImpl(schema, field_name, body); - } - default: { - PanicInfo("unsupported data_type"); - } - } -} - static std::unique_ptr CreatePlanImplNaive(const Schema& schema, const std::string& dsl_str) { auto plan = std::make_unique(schema); @@ -208,10 +143,6 @@ CreatePlanImplNaive(const Schema& schema, const std::string& dsl_str) { if (pack.contains("vector")) { auto& out_body = pack.at("vector"); plan->plan_node_ = ParseVecNode(plan.get(), out_body); - } else if (pack.contains("term")) { - AssertInfo(!predicate, "unsupported complex DSL"); - auto& out_body = pack.at("term"); - predicate = ParseTermNode(schema, out_body); } else if (pack.contains("range")) { AssertInfo(!predicate, "unsupported complex DSL"); auto& out_body = pack.at("range"); diff --git a/internal/core/src/query/PlanImpl.h b/internal/core/src/query/PlanImpl.h index 1d33eb18ae..453959e530 100644 --- a/internal/core/src/query/PlanImpl.h +++ b/internal/core/src/query/PlanImpl.h @@ -20,6 +20,7 @@ #include #include #include +#include namespace milvus::query { using Json = nlohmann::json; @@ -38,6 +39,9 @@ struct Plan { // TODO: add move extra info }; +template +using aligned_vector = std::vector>; + struct Placeholder { // milvus::proto::service::PlaceholderGroup group_; std::string tag_; diff --git a/internal/core/src/query/Search.cpp b/internal/core/src/query/Search.cpp index 084bb8e5de..bfaad5a37c 100644 --- a/internal/core/src/query/Search.cpp +++ b/internal/core/src/query/Search.cpp @@ -27,7 +27,7 @@ create_bitmap_view(std::optional bitmaps_opt, int64_t chunk return nullptr; } auto& bitmaps = *bitmaps_opt.value(); - auto src_vec = ~bitmaps.at(chunk_id); + auto& src_vec = bitmaps.at(chunk_id); auto dst = std::make_shared(src_vec.size()); auto iter = reinterpret_cast(dst->mutable_data()); diff --git a/internal/core/src/query/generated/ExecExprVisitor.h b/internal/core/src/query/generated/ExecExprVisitor.h index 250d68a6e5..86464b70b0 100644 --- a/internal/core/src/query/generated/ExecExprVisitor.h +++ b/internal/core/src/query/generated/ExecExprVisitor.h @@ -58,10 +58,6 @@ class ExecExprVisitor : ExprVisitor { auto ExecRangeVisitorDispatcher(RangeExpr& expr_raw) -> RetType; - template - auto - ExecTermVisitorImpl(TermExpr& expr_raw) -> RetType; - private: segcore::SegmentSmallIndex& segment_; std::optional ret_; diff --git a/internal/core/src/query/visitors/ExecExprVisitor.cpp b/internal/core/src/query/visitors/ExecExprVisitor.cpp index d5eb3a026b..83e5782c91 100644 --- a/internal/core/src/query/visitors/ExecExprVisitor.cpp +++ b/internal/core/src/query/visitors/ExecExprVisitor.cpp @@ -46,10 +46,6 @@ class ExecExprVisitor : ExprVisitor { auto ExecRangeVisitorDispatcher(RangeExpr& expr_raw) -> RetType; - template - auto - ExecTermVisitorImpl(TermExpr& expr_raw) -> RetType; - private: segcore::SegmentSmallIndex& segment_; std::optional ret_; @@ -67,6 +63,11 @@ ExecExprVisitor::visit(BoolBinaryExpr& expr) { PanicInfo("unimplemented"); } +void +ExecExprVisitor::visit(TermExpr& expr) { + PanicInfo("unimplemented"); +} + template auto ExecExprVisitor::ExecRangeVisitorImpl(RangeExprImpl& expr, IndexFunc index_func, ElementFunc element_func) @@ -83,17 +84,17 @@ ExecExprVisitor::ExecRangeVisitorImpl(RangeExprImpl& expr, IndexFunc index_fu auto& indexing_record = segment_.get_indexing_record(); const segcore::ScalarIndexingEntry& entry = indexing_record.get_scalar_entry(field_offset); - RetType results(vec.num_chunk()); + RetType results(vec.chunk_size()); auto indexing_barrier = indexing_record.get_finished_ack(); for (auto chunk_id = 0; chunk_id < indexing_barrier; ++chunk_id) { auto& result = results[chunk_id]; auto indexing = entry.get_indexing(chunk_id); auto data = index_func(indexing); - result = std::move(*data); + result = ~std::move(*data); Assert(result.size() == segcore::DefaultElementPerChunk); } - for (auto chunk_id = indexing_barrier; chunk_id < vec.num_chunk(); ++chunk_id) { + for (auto chunk_id = indexing_barrier; chunk_id < vec.chunk_size(); ++chunk_id) { auto& result = results[chunk_id]; result.resize(segcore::DefaultElementPerChunk); auto chunk = vec.get_chunk(chunk_id); @@ -125,32 +126,32 @@ ExecExprVisitor::ExecRangeVisitorDispatcher(RangeExpr& expr_raw) -> RetType { switch (op) { case OpType::Equal: { auto index_func = [val](Index* index) { return index->In(1, &val); }; - return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return (x == val); }); + return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return !(x == val); }); } case OpType::NotEqual: { auto index_func = [val](Index* index) { return index->NotIn(1, &val); }; - return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return (x != val); }); + return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return !(x != val); }); } case OpType::GreaterEqual: { auto index_func = [val](Index* index) { return index->Range(val, Operator::GE); }; - return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return (x >= val); }); + return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return !(x >= val); }); } case OpType::GreaterThan: { auto index_func = [val](Index* index) { return index->Range(val, Operator::GT); }; - return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return (x > val); }); + return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return !(x > val); }); } case OpType::LessEqual: { auto index_func = [val](Index* index) { return index->Range(val, Operator::LE); }; - return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return (x <= val); }); + return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return !(x <= val); }); } case OpType::LessThan: { auto index_func = [val](Index* index) { return index->Range(val, Operator::LT); }; - return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return (x < val); }); + return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return !(x < val); }); } default: { PanicInfo("unsupported range node"); @@ -166,16 +167,16 @@ ExecExprVisitor::ExecRangeVisitorDispatcher(RangeExpr& expr_raw) -> RetType { if (false) { } else if (ops == std::make_tuple(OpType::GreaterThan, OpType::LessThan)) { auto index_func = [val1, val2](Index* index) { return index->Range(val1, false, val2, false); }; - return ExecRangeVisitorImpl(expr, index_func, [val1, val2](T x) { return (val1 < x && x < val2); }); + return ExecRangeVisitorImpl(expr, index_func, [val1, val2](T x) { return !(val1 < x && x < val2); }); } else if (ops == std::make_tuple(OpType::GreaterThan, OpType::LessEqual)) { auto index_func = [val1, val2](Index* index) { return index->Range(val1, false, val2, true); }; - return ExecRangeVisitorImpl(expr, index_func, [val1, val2](T x) { return (val1 < x && x <= val2); }); + return ExecRangeVisitorImpl(expr, index_func, [val1, val2](T x) { return !(val1 < x && x <= val2); }); } else if (ops == std::make_tuple(OpType::GreaterEqual, OpType::LessThan)) { auto index_func = [val1, val2](Index* index) { return index->Range(val1, true, val2, false); }; - return ExecRangeVisitorImpl(expr, index_func, [val1, val2](T x) { return (val1 <= x && x < val2); }); + return ExecRangeVisitorImpl(expr, index_func, [val1, val2](T x) { return !(val1 <= x && x < val2); }); } else if (ops == std::make_tuple(OpType::GreaterEqual, OpType::LessEqual)) { auto index_func = [val1, val2](Index* index) { return index->Range(val1, true, val2, true); }; - return ExecRangeVisitorImpl(expr, index_func, [val1, val2](T x) { return (val1 <= x && x <= val2); }); + return ExecRangeVisitorImpl(expr, index_func, [val1, val2](T x) { return !(val1 <= x && x <= val2); }); } else { PanicInfo("unsupported range node"); } @@ -225,79 +226,4 @@ ExecExprVisitor::visit(RangeExpr& expr) { ret_ = std::move(ret); } -template -auto -ExecExprVisitor::ExecTermVisitorImpl(TermExpr& expr_raw) -> RetType { - auto& expr = static_cast&>(expr_raw); - auto& records = segment_.get_insert_record(); - auto data_type = expr.data_type_; - auto& schema = segment_.get_schema(); - auto field_offset_opt = schema.get_offset(expr.field_id_); - Assert(field_offset_opt); - auto field_offset = field_offset_opt.value(); - auto& field_meta = schema[field_offset]; - auto vec_ptr = records.get_entity(field_offset); - auto& vec = *vec_ptr; - auto num_chunk = vec.num_chunk(); - RetType bitsets; - - auto N = records.ack_responder_.GetAck(); - - // small batch - for (int64_t chunk_id = 0; chunk_id < num_chunk; ++chunk_id) { - auto& chunk = vec.get_chunk(chunk_id); - - auto size = chunk_id == num_chunk - 1 ? N - chunk_id * segcore::DefaultElementPerChunk - : segcore::DefaultElementPerChunk; - - boost::dynamic_bitset<> bitset(segcore::DefaultElementPerChunk); - for (int i = 0; i < size; ++i) { - auto value = chunk[i]; - bool is_in = std::binary_search(expr.terms_.begin(), expr.terms_.end(), value); - bitset[i] = is_in; - } - bitsets.emplace_back(std::move(bitset)); - } - return bitsets; -} - -void -ExecExprVisitor::visit(TermExpr& expr) { - auto& field_meta = segment_.get_schema()[expr.field_id_]; - Assert(expr.data_type_ == field_meta.get_data_type()); - RetType ret; - switch (expr.data_type_) { - case DataType::BOOL: { - ret = ExecTermVisitorImpl(expr); - break; - } - case DataType::INT8: { - ret = ExecTermVisitorImpl(expr); - break; - } - case DataType::INT16: { - ret = ExecTermVisitorImpl(expr); - break; - } - case DataType::INT32: { - ret = ExecTermVisitorImpl(expr); - break; - } - case DataType::INT64: { - ret = ExecTermVisitorImpl(expr); - break; - } - case DataType::FLOAT: { - ret = ExecTermVisitorImpl(expr); - break; - } - case DataType::DOUBLE: { - ret = ExecTermVisitorImpl(expr); - break; - } - default: - PanicInfo("unsupported"); - } - ret_ = std::move(ret); -} } // namespace milvus::query diff --git a/internal/core/src/segcore/ConcurrentVector.h b/internal/core/src/segcore/ConcurrentVector.h index c168f811cf..3d2a43f45f 100644 --- a/internal/core/src/segcore/ConcurrentVector.h +++ b/internal/core/src/segcore/ConcurrentVector.h @@ -196,7 +196,7 @@ class ConcurrentVectorImpl : public VectorBase { } ssize_t - num_chunk() const { + chunk_size() const { return chunks_.size(); } diff --git a/internal/core/src/segcore/IndexingEntry.cpp b/internal/core/src/segcore/IndexingEntry.cpp index aafa54da36..a03d86ac92 100644 --- a/internal/core/src/segcore/IndexingEntry.cpp +++ b/internal/core/src/segcore/IndexingEntry.cpp @@ -24,7 +24,7 @@ VecIndexingEntry::BuildIndexRange(int64_t ack_beg, int64_t ack_end, const Vector auto source = dynamic_cast*>(vec_base); Assert(source); - auto chunk_size = source->num_chunk(); + auto chunk_size = source->chunk_size(); assert(ack_end <= chunk_size); auto conf = get_build_conf(); data_.grow_to_at_least(ack_end); @@ -87,7 +87,7 @@ void ScalarIndexingEntry::BuildIndexRange(int64_t ack_beg, int64_t ack_end, const VectorBase* vec_base) { auto source = dynamic_cast*>(vec_base); Assert(source); - auto chunk_size = source->num_chunk(); + auto chunk_size = source->chunk_size(); assert(ack_end <= chunk_size); data_.grow_to_at_least(ack_end); for (int chunk_id = ack_beg; chunk_id < ack_end; chunk_id++) { diff --git a/internal/core/src/segcore/SegmentNaive.cpp b/internal/core/src/segcore/SegmentNaive.cpp index e08f9a290e..1091c6df25 100644 --- a/internal/core/src/segcore/SegmentNaive.cpp +++ b/internal/core/src/segcore/SegmentNaive.cpp @@ -467,16 +467,16 @@ SegmentNaive::BuildVecIndexImpl(const IndexMeta::Entry& entry) { auto dim = field.get_dim(); auto indexing = knowhere::VecIndexFactory::GetInstance().CreateVecIndex(entry.type, entry.mode); - auto chunk_size = record_.uids_.num_chunk(); + auto chunk_size = record_.uids_.chunk_size(); auto& uids = record_.uids_; auto entities = record_.get_entity(offset); std::vector datasets; - for (int chunk_id = 0; chunk_id < uids.num_chunk(); ++chunk_id) { + for (int chunk_id = 0; chunk_id < uids.chunk_size(); ++chunk_id) { auto entities_chunk = entities->get_chunk(chunk_id).data(); - int64_t count = chunk_id == uids.num_chunk() - 1 ? record_.reserved - chunk_id * DefaultElementPerChunk - : DefaultElementPerChunk; + int64_t count = chunk_id == uids.chunk_size() - 1 ? record_.reserved - chunk_id * DefaultElementPerChunk + : DefaultElementPerChunk; datasets.push_back(knowhere::GenDataset(count, dim, entities_chunk)); } for (auto& ds : datasets) { diff --git a/internal/core/src/segcore/SegmentSmallIndex.cpp b/internal/core/src/segcore/SegmentSmallIndex.cpp index 565182983a..cae5160ff8 100644 --- a/internal/core/src/segcore/SegmentSmallIndex.cpp +++ b/internal/core/src/segcore/SegmentSmallIndex.cpp @@ -241,10 +241,10 @@ SegmentSmallIndex::BuildVecIndexImpl(const IndexMeta::Entry& entry) { auto entities = record_.get_entity(offset); std::vector datasets; - for (int chunk_id = 0; chunk_id < uids.num_chunk(); ++chunk_id) { + for (int chunk_id = 0; chunk_id < uids.chunk_size(); ++chunk_id) { auto entities_chunk = entities->get_chunk(chunk_id).data(); - int64_t count = chunk_id == uids.num_chunk() - 1 ? record_.reserved - chunk_id * DefaultElementPerChunk - : DefaultElementPerChunk; + int64_t count = chunk_id == uids.chunk_size() - 1 ? record_.reserved - chunk_id * DefaultElementPerChunk + : DefaultElementPerChunk; datasets.push_back(knowhere::GenDataset(count, dim, entities_chunk)); } for (auto& ds : datasets) { diff --git a/internal/core/unittest/CMakeLists.txt b/internal/core/unittest/CMakeLists.txt index 29fef50fb7..ebf2ef375f 100644 --- a/internal/core/unittest/CMakeLists.txt +++ b/internal/core/unittest/CMakeLists.txt @@ -26,5 +26,4 @@ target_link_libraries(all_tests pthread milvus_utils ) - install (TARGETS all_tests DESTINATION unittest) diff --git a/internal/core/unittest/test_common.cpp b/internal/core/unittest/test_common.cpp deleted file mode 100644 index 2be5c0bee0..0000000000 --- a/internal/core/unittest/test_common.cpp +++ /dev/null @@ -1,12 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License - -#include diff --git a/internal/core/unittest/test_concurrent_vector.cpp b/internal/core/unittest/test_concurrent_vector.cpp index fe6687f890..ce0acfb11c 100644 --- a/internal/core/unittest/test_concurrent_vector.cpp +++ b/internal/core/unittest/test_concurrent_vector.cpp @@ -52,7 +52,7 @@ TEST(ConcurrentVector, TestSingle) { c_vec.set_data(total_count, vec.data(), insert_size); total_count += insert_size; } - ASSERT_EQ(c_vec.num_chunk(), (total_count + 31) / 32); + ASSERT_EQ(c_vec.chunk_size(), (total_count + 31) / 32); for (int i = 0; i < total_count; ++i) { for (int d = 0; d < dim; ++d) { auto std_data = d + i * dim; diff --git a/internal/core/unittest/test_expr.cpp b/internal/core/unittest/test_expr.cpp index 2d968f09cf..c2a7988b25 100644 --- a/internal/core/unittest/test_expr.cpp +++ b/internal/core/unittest/test_expr.cpp @@ -321,88 +321,7 @@ TEST(Expr, TestRange) { auto ans = final[vec_id][offset]; auto val = age_col[i]; - auto ref = ref_func(val); - ASSERT_EQ(ans, ref) << clause << "@" << i << "!!" << val; - } - } -} - -TEST(Expr, TestTerm) { - using namespace milvus::query; - using namespace milvus::segcore; - auto vec_2k_3k = [] { - std::string buf = "["; - for (int i = 2000; i < 3000 - 1; ++i) { - buf += std::to_string(i) + ", "; - } - buf += std::to_string(2999) + "]"; - return buf; - }(); - - std::vector>> testcases = { - {R"([2000, 3000])", [](int v) { return v == 2000 || v == 3000; }}, - {R"([2000])", [](int v) { return v == 2000; }}, - {R"([3000])", [](int v) { return v == 3000; }}, - {vec_2k_3k, [](int v) { return 2000 <= v && v < 3000; }}, - }; - - std::string dsl_string_tmp = R"( -{ - "bool": { - "must": [ - { - "term": { - "age": @@@@ - } - }, - { - "vector": { - "fakevec": { - "metric_type": "L2", - "params": { - "nprobe": 10 - }, - "query": "$0", - "topk": 10 - } - } - } - ] - } -})"; - auto schema = std::make_shared(); - schema->AddField("fakevec", DataType::VECTOR_FLOAT, 16, MetricType::METRIC_L2); - schema->AddField("age", DataType::INT32); - - auto seg = CreateSegment(schema); - int N = 10000; - std::vector age_col; - int num_iters = 100; - for (int iter = 0; iter < num_iters; ++iter) { - auto raw_data = DataGen(schema, N, iter); - auto new_age_col = raw_data.get_col(1); - age_col.insert(age_col.end(), new_age_col.begin(), new_age_col.end()); - seg->PreInsert(N); - seg->Insert(iter * N, N, raw_data.row_ids_.data(), raw_data.timestamps_.data(), raw_data.raw_); - } - - auto seg_promote = dynamic_cast(seg.get()); - ExecExprVisitor visitor(*seg_promote); - for (auto [clause, ref_func] : testcases) { - auto loc = dsl_string_tmp.find("@@@@"); - auto dsl_string = dsl_string_tmp; - dsl_string.replace(loc, 4, clause); - auto plan = CreatePlan(*schema, dsl_string); - auto final = visitor.call_child(*plan->plan_node_->predicate_.value()); - EXPECT_EQ(final.size(), upper_div(N * num_iters, DefaultElementPerChunk)); - - for (int i = 0; i < N * num_iters; ++i) { - auto vec_id = i / DefaultElementPerChunk; - auto offset = i % DefaultElementPerChunk; - auto ans = final[vec_id][offset]; - - auto val = age_col[i]; - auto ref = ref_func(val); + auto ref = !ref_func(val); ASSERT_EQ(ans, ref) << clause << "@" << i << "!!" << val; } } diff --git a/internal/core/unittest/test_utils/DataGen.h b/internal/core/unittest/test_utils/DataGen.h index 7725eb5ebd..09b1aa2900 100644 --- a/internal/core/unittest/test_utils/DataGen.h +++ b/internal/core/unittest/test_utils/DataGen.h @@ -31,14 +31,6 @@ struct GeneratedData { memcpy(ret.data(), target.data(), target.size()); return ret; } - template - auto - get_mutable_col(int index) { - auto& target = cols_.at(index); - assert(target.size() == row_ids_.size() * sizeof(T)); - auto ptr = reinterpret_cast(target.data()); - return ptr; - } private: GeneratedData() = default; @@ -66,9 +58,6 @@ GeneratedData::generate_rows(int N, SchemaPtr schema) { } } rows_ = std::move(result); - raw_.raw_data = rows_.data(); - raw_.sizeof_per_row = schema->get_total_sizeof(); - raw_.count = N; } inline GeneratedData @@ -140,12 +129,14 @@ DataGen(SchemaPtr schema, int64_t N, uint64_t seed = 42) { } GeneratedData res; res.cols_ = std::move(cols); + res.generate_rows(N, schema); for (int i = 0; i < N; ++i) { res.row_ids_.push_back(i); res.timestamps_.push_back(i); } - - res.generate_rows(N, schema); + res.raw_.raw_data = res.rows_.data(); + res.raw_.sizeof_per_row = schema->get_total_sizeof(); + res.raw_.count = N; return std::move(res); } diff --git a/internal/storage/binlog_reader.go b/internal/storage/binlog_reader.go deleted file mode 100644 index 08187ab2e2..0000000000 --- a/internal/storage/binlog_reader.go +++ /dev/null @@ -1,96 +0,0 @@ -package storage - -import ( - "bytes" - "encoding/binary" - "strconv" - - "github.com/zilliztech/milvus-distributed/internal/errors" -) - -type BinlogReader struct { - magicNumber int32 - descriptorEvent - currentEventReader *EventReader - buffer *bytes.Buffer - currentOffset int32 - isClose bool -} - -func (reader *BinlogReader) NextEventReader() (*EventReader, error) { - if reader.isClose { - return nil, errors.New("bin log reader is closed") - } - if reader.currentEventReader != nil { - reader.currentOffset = reader.currentEventReader.nextPosition - if err := reader.currentEventReader.Close(); err != nil { - return nil, err - } - reader.currentEventReader = nil - if reader.currentOffset >= int32(reader.buffer.Cap()) { - return nil, nil - } - // skip remaining bytes of this event - reader.buffer.Next(int(reader.currentOffset) - reader.buffer.Len()) - } - if reader.currentOffset >= int32(reader.buffer.Cap()) { - return nil, nil - } - eventReader, err := newEventReader(reader.descriptorEvent.payloadDataType, reader.buffer) - if err != nil { - return nil, err - } - reader.currentEventReader = eventReader - return reader.currentEventReader, nil -} - -func (reader *BinlogReader) readMagicNumber() (int32, error) { - if err := binary.Read(reader.buffer, binary.LittleEndian, reader.magicNumber); err != nil { - return -1, err - } - reader.currentOffset = 4 - if reader.magicNumber != MagicNumber { - return -1, errors.New("parse magic number failed, expected: " + strconv.Itoa(MagicNumber) + - ", actual: " + strconv.Itoa(int(reader.magicNumber))) - } - - return reader.magicNumber, nil -} - -func (reader *BinlogReader) readDescriptorEvent() (*descriptorEvent, error) { - event, err := ReadDescriptorEvent(reader.buffer) - reader.currentOffset = reader.descriptorEvent.nextPosition - if err != nil { - return nil, err - } - reader.descriptorEvent = *event - return &reader.descriptorEvent, nil -} - -func (reader *BinlogReader) Close() error { - if reader.isClose { - return nil - } - reader.isClose = true - if reader.currentEventReader != nil { - if err := reader.currentEventReader.Close(); err != nil { - return err - } - } - return nil -} - -func NewBinlogReader(data []byte) (*BinlogReader, error) { - reader := &BinlogReader{ - buffer: bytes.NewBuffer(data), - isClose: false, - } - - if _, err := reader.readMagicNumber(); err != nil { - return nil, err - } - if _, err := reader.readDescriptorEvent(); err != nil { - return nil, err - } - return reader, nil -} diff --git a/internal/storage/binlog_writer.go b/internal/storage/binlog_writer.go deleted file mode 100644 index 1b7a81f930..0000000000 --- a/internal/storage/binlog_writer.go +++ /dev/null @@ -1,299 +0,0 @@ -package storage - -import ( - "bytes" - "encoding/binary" - "errors" - - "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" -) - -const ( - // todo : put to param table - ServerID = 1 - BinlogVersion = 1 - CommitID = 1 - ServerVersion = 1 - HeaderLength = 17 -) - -type BinlogType int32 - -const ( - InsertBinlog BinlogType = iota - DeleteBinlog - DDLBinlog -) -const ( - MagicNumber = 0xfffabc -) - -type baseBinlogWriter struct { - descriptorEvent - magicNumber int32 - binlogType BinlogType - eventWriters []EventWriter - currentEventWriter EventWriter - buffer *bytes.Buffer - numEvents int32 - numRows int32 - isClose bool - offset int32 -} - -func (writer *baseBinlogWriter) checkClose() error { - if writer.isClose { - return errors.New("insert binlog writer is already closed") - } - return nil -} - -func (writer *baseBinlogWriter) appendEventWriter() error { - if writer.currentEventWriter != nil { - if err := writer.currentEventWriter.Finish(); err != nil { - return err - } - - writer.eventWriters = append(writer.eventWriters, writer.currentEventWriter) - length, err := writer.currentEventWriter.GetMemoryUsageInBytes() - if err != nil { - return err - } - writer.offset += length - writer.numEvents++ - nums, err := writer.currentEventWriter.GetPayloadLengthFromWriter() - if err != nil { - return err - } - writer.numRows += int32(nums) - writer.currentEventWriter = nil - } - return nil -} - -func (writer *baseBinlogWriter) GetEventNums() int32 { - return writer.numEvents -} - -func (writer *baseBinlogWriter) GetRowNums() (int32, error) { - var res = writer.numRows - if writer.currentEventWriter != nil { - nums, err := writer.currentEventWriter.GetPayloadLengthFromWriter() - if err != nil { - } - res += int32(nums) - } - return res, nil -} - -func (writer *baseBinlogWriter) GetBinlogType() BinlogType { - return writer.binlogType -} - -// GetBuffer get binlog buffer. Return nil if binlog is not finished yet. -func (writer *baseBinlogWriter) GetBuffer() []byte { - if writer.buffer != nil { - return writer.buffer.Bytes() - } - return nil -} - -// Close allocate buffer and release resource -func (writer *baseBinlogWriter) Close() error { - if writer.isClose { - return nil - } - writer.isClose = true - if err := writer.appendEventWriter(); err != nil { - return err - } - writer.buffer = new(bytes.Buffer) - if err := binary.Write(writer.buffer, binary.LittleEndian, int32(MagicNumber)); err != nil { - return err - } - if err := writer.descriptorEvent.Write(writer.buffer); err != nil { - return err - } - for _, w := range writer.eventWriters { - if err := w.Write(writer.buffer); err != nil { - return err - } - } - - // close all writers - for _, e := range writer.eventWriters { - if err := e.Close(); err != nil { - return err - } - } - return nil -} - -type InsertBinlogWriter struct { - baseBinlogWriter -} - -func (writer *InsertBinlogWriter) NextInsertEventWriter() (*insertEventWriter, error) { - if err := writer.checkClose(); err != nil { - return nil, err - } - if err := writer.appendEventWriter(); err != nil { - return nil, err - } - - event, err := newInsertEventWriter(writer.payloadDataType, writer.offset) - if err != nil { - return nil, err - } - writer.currentEventWriter = event - - return event, nil -} - -type DeleteBinlogWriter struct { - baseBinlogWriter -} - -func (writer *DeleteBinlogWriter) NextDeleteEventWriter() (*deleteEventWriter, error) { - if err := writer.checkClose(); err != nil { - return nil, err - } - if err := writer.appendEventWriter(); err != nil { - return nil, err - } - - event, err := newDeleteEventWriter(writer.payloadDataType, writer.offset) - if err != nil { - return nil, err - } - writer.currentEventWriter = event - - return event, nil -} - -type DDLBinlogWriter struct { - baseBinlogWriter -} - -func (writer *DDLBinlogWriter) NextCreateCollectionEventWriter() (*createCollectionEventWriter, error) { - if err := writer.checkClose(); err != nil { - return nil, err - } - if err := writer.appendEventWriter(); err != nil { - return nil, err - } - - event, err := newCreateCollectionEventWriter(writer.payloadDataType, writer.offset) - if err != nil { - return nil, err - } - writer.currentEventWriter = event - - return event, nil -} - -func (writer *DDLBinlogWriter) NextDropCollectionEventWriter() (*dropCollectionEventWriter, error) { - if err := writer.checkClose(); err != nil { - return nil, err - } - if err := writer.appendEventWriter(); err != nil { - return nil, err - } - - event, err := newDropCollectionEventWriter(writer.payloadDataType, writer.offset) - if err != nil { - return nil, err - } - writer.currentEventWriter = event - - return event, nil -} - -func (writer *DDLBinlogWriter) NextCreatePartitionEventWriter() (*createPartitionEventWriter, error) { - if err := writer.checkClose(); err != nil { - return nil, err - } - if err := writer.appendEventWriter(); err != nil { - return nil, err - } - - event, err := newCreatePartitionEventWriter(writer.payloadDataType, writer.offset) - if err != nil { - return nil, err - } - writer.currentEventWriter = event - - return event, nil -} - -func (writer *DDLBinlogWriter) NextDropPartitionEventWriter() (*dropPartitionEventWriter, error) { - if err := writer.checkClose(); err != nil { - return nil, err - } - if err := writer.appendEventWriter(); err != nil { - return nil, err - } - - event, err := newDropPartitionEventWriter(writer.payloadDataType, writer.offset) - if err != nil { - return nil, err - } - writer.currentEventWriter = event - - return event, nil -} - -func NewInsertBinlogWriter(dataType schemapb.DataType) *InsertBinlogWriter { - descriptorEvent := newDescriptorEvent() - descriptorEvent.payloadDataType = dataType - return &InsertBinlogWriter{ - baseBinlogWriter: baseBinlogWriter{ - descriptorEvent: descriptorEvent, - magicNumber: MagicNumber, - binlogType: InsertBinlog, - eventWriters: make([]EventWriter, 0), - currentEventWriter: nil, - buffer: nil, - numEvents: 0, - numRows: 0, - isClose: false, - offset: 4 + descriptorEvent.descriptorEventData.GetMemoryUsageInBytes(), - }, - } -} -func NewDeleteBinlogWriter(dataType schemapb.DataType) *DeleteBinlogWriter { - descriptorEvent := newDescriptorEvent() - descriptorEvent.payloadDataType = dataType - return &DeleteBinlogWriter{ - baseBinlogWriter: baseBinlogWriter{ - descriptorEvent: descriptorEvent, - magicNumber: MagicNumber, - binlogType: DeleteBinlog, - eventWriters: make([]EventWriter, 0), - currentEventWriter: nil, - buffer: nil, - numEvents: 0, - numRows: 0, - isClose: false, - offset: 4 + descriptorEvent.descriptorEventData.GetMemoryUsageInBytes(), - }, - } -} -func NewDDLBinlogWriter(dataType schemapb.DataType) *DDLBinlogWriter { - descriptorEvent := newDescriptorEvent() - descriptorEvent.payloadDataType = dataType - return &DDLBinlogWriter{ - baseBinlogWriter: baseBinlogWriter{ - descriptorEvent: descriptorEvent, - magicNumber: MagicNumber, - binlogType: DDLBinlog, - eventWriters: make([]EventWriter, 0), - currentEventWriter: nil, - buffer: nil, - numEvents: 0, - numRows: 0, - isClose: false, - offset: 4 + descriptorEvent.descriptorEventData.GetMemoryUsageInBytes(), - }, - } -} diff --git a/internal/storage/binlog_writer_test.go b/internal/storage/binlog_writer_test.go deleted file mode 100644 index ef87170887..0000000000 --- a/internal/storage/binlog_writer_test.go +++ /dev/null @@ -1,29 +0,0 @@ -package storage - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" -) - -func TestBinlogWriter(t *testing.T) { - binlogWriter := NewInsertBinlogWriter(schemapb.DataType_INT32) - defer binlogWriter.Close() - eventWriter, err := binlogWriter.NextInsertEventWriter() - assert.Nil(t, err) - err = eventWriter.AddInt32ToPayload([]int32{1, 2, 3}) - assert.Nil(t, err) - assert.Nil(t, nil, binlogWriter.GetBuffer()) - err = binlogWriter.Close() - assert.Nil(t, err) - assert.EqualValues(t, 1, binlogWriter.GetEventNums()) - nums, err := binlogWriter.GetRowNums() - assert.Nil(t, err) - assert.EqualValues(t, 3, nums) - err = eventWriter.AddInt32ToPayload([]int32{1, 2, 3}) - assert.NotNil(t, err) - nums, err = binlogWriter.GetRowNums() - assert.Nil(t, err) - assert.EqualValues(t, 3, nums) -} diff --git a/internal/storage/event_data.go b/internal/storage/event_data.go deleted file mode 100644 index 6d598a066e..0000000000 --- a/internal/storage/event_data.go +++ /dev/null @@ -1,347 +0,0 @@ -package storage - -import ( - "bytes" - "encoding/binary" - "io" - - "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" - - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" -) - -type descriptorEventData struct { - descriptorEventDataFixPart - postHeaderLengths []uint8 -} - -type descriptorEventDataFixPart struct { - binlogVersion int16 - serverVersion int64 - commitID int64 - headerLength int8 - collectionID int64 - partitionID int64 - segmentID int64 - startTimestamp typeutil.Timestamp - endTimestamp typeutil.Timestamp - payloadDataType schemapb.DataType -} - -func (data *descriptorEventData) SetStartTimeStamp(ts typeutil.Timestamp) { - data.startTimestamp = ts -} - -func (data *descriptorEventData) SetEndTimeStamp(ts typeutil.Timestamp) { - data.endTimestamp = ts -} - -func (data *descriptorEventData) GetMemoryUsageInBytes() int32 { - buf := new(bytes.Buffer) - _ = data.Write(buf) - return int32(buf.Len()) -} - -func (data *descriptorEventData) Write(buffer io.Writer) error { - if err := binary.Write(buffer, binary.LittleEndian, data.descriptorEventDataFixPart); err != nil { - return err - } - - if err := binary.Write(buffer, binary.LittleEndian, data.postHeaderLengths); err != nil { - return err - } - return nil -} - -func readDescriptorEventData(buffer io.Reader) (*descriptorEventData, error) { - event := newDescriptorEventData() - - if err := binary.Read(buffer, binary.LittleEndian, &event.descriptorEventDataFixPart); err != nil { - return nil, err - } - - if err := binary.Read(buffer, binary.LittleEndian, &event.postHeaderLengths); err != nil { - return nil, err - } - - return &event, nil -} - -type eventData interface { - GetEventDataSize() int32 - WriteEventData(buffer io.Writer) error -} - -// all event types' fixed part only have start timestamp and end timestamp yet, but maybe different events will -// have different fields later, so we just create a event data struct per event type. -type insertEventData struct { - startTimestamp typeutil.Timestamp - endTimestamp typeutil.Timestamp -} - -func (data *insertEventData) SetStartTimestamp(timestamp typeutil.Timestamp) { - data.startTimestamp = timestamp -} - -func (data *insertEventData) SetEndTimestamp(timestamp typeutil.Timestamp) { - data.endTimestamp = timestamp -} - -func (data *insertEventData) GetEventDataSize() int32 { - buf := new(bytes.Buffer) - _ = binary.Write(buf, binary.LittleEndian, data) - return int32(buf.Len()) -} - -func (data *insertEventData) WriteEventData(buffer io.Writer) error { - return binary.Write(buffer, binary.LittleEndian, data) -} - -type deleteEventData struct { - startTimestamp typeutil.Timestamp - endTimestamp typeutil.Timestamp -} - -func (data *deleteEventData) SetStartTimestamp(timestamp typeutil.Timestamp) { - data.startTimestamp = timestamp -} - -func (data *deleteEventData) SetEndTimestamp(timestamp typeutil.Timestamp) { - data.endTimestamp = timestamp -} - -func (data *deleteEventData) GetEventDataSize() int32 { - buf := new(bytes.Buffer) - _ = binary.Write(buf, binary.LittleEndian, data) - return int32(buf.Len()) -} - -func (data *deleteEventData) WriteEventData(buffer io.Writer) error { - if err := binary.Write(buffer, binary.LittleEndian, data.startTimestamp); err != nil { - return err - } - if err := binary.Write(buffer, binary.LittleEndian, data.endTimestamp); err != nil { - return err - } - return nil -} - -type createCollectionEventData struct { - startTimestamp typeutil.Timestamp - endTimestamp typeutil.Timestamp -} - -func (data *createCollectionEventData) SetStartTimestamp(timestamp typeutil.Timestamp) { - data.startTimestamp = timestamp -} - -func (data *createCollectionEventData) SetEndTimestamp(timestamp typeutil.Timestamp) { - data.endTimestamp = timestamp -} - -func (data *createCollectionEventData) GetEventDataSize() int32 { - buf := new(bytes.Buffer) - _ = binary.Write(buf, binary.LittleEndian, data) - return int32(buf.Len()) -} - -func (data *createCollectionEventData) WriteEventData(buffer io.Writer) error { - if err := binary.Write(buffer, binary.LittleEndian, data.startTimestamp); err != nil { - return err - } - if err := binary.Write(buffer, binary.LittleEndian, data.endTimestamp); err != nil { - return err - } - return nil -} - -type dropCollectionEventData struct { - startTimestamp typeutil.Timestamp - endTimestamp typeutil.Timestamp -} - -func (data *dropCollectionEventData) SetStartTimestamp(timestamp typeutil.Timestamp) { - data.startTimestamp = timestamp -} - -func (data *dropCollectionEventData) SetEndTimestamp(timestamp typeutil.Timestamp) { - data.endTimestamp = timestamp -} - -func (data *dropCollectionEventData) GetEventDataSize() int32 { - buf := new(bytes.Buffer) - _ = binary.Write(buf, binary.LittleEndian, data) - return int32(buf.Len()) -} - -func (data *dropCollectionEventData) WriteEventData(buffer io.Writer) error { - if err := binary.Write(buffer, binary.LittleEndian, data.startTimestamp); err != nil { - return err - } - if err := binary.Write(buffer, binary.LittleEndian, data.endTimestamp); err != nil { - return err - } - return nil -} - -type createPartitionEventData struct { - startTimestamp typeutil.Timestamp - endTimestamp typeutil.Timestamp -} - -func (data *createPartitionEventData) SetStartTimestamp(timestamp typeutil.Timestamp) { - data.startTimestamp = timestamp -} - -func (data *createPartitionEventData) SetEndTimestamp(timestamp typeutil.Timestamp) { - data.endTimestamp = timestamp -} - -func (data *createPartitionEventData) GetEventDataSize() int32 { - buf := new(bytes.Buffer) - _ = binary.Write(buf, binary.LittleEndian, data) - return int32(buf.Len()) -} - -func (data *createPartitionEventData) WriteEventData(buffer io.Writer) error { - if err := binary.Write(buffer, binary.LittleEndian, data.startTimestamp); err != nil { - return err - } - if err := binary.Write(buffer, binary.LittleEndian, data.endTimestamp); err != nil { - return err - } - return nil -} - -type dropPartitionEventData struct { - startTimestamp typeutil.Timestamp - endTimestamp typeutil.Timestamp -} - -func (data *dropPartitionEventData) SetStartTimestamp(timestamp typeutil.Timestamp) { - data.startTimestamp = timestamp -} - -func (data *dropPartitionEventData) SetEndTimestamp(timestamp typeutil.Timestamp) { - data.endTimestamp = timestamp -} - -func (data *dropPartitionEventData) GetEventDataSize() int32 { - buf := new(bytes.Buffer) - _ = binary.Write(buf, binary.LittleEndian, data) - return int32(buf.Len()) -} - -func (data *dropPartitionEventData) WriteEventData(buffer io.Writer) error { - if err := binary.Write(buffer, binary.LittleEndian, data.startTimestamp); err != nil { - return err - } - if err := binary.Write(buffer, binary.LittleEndian, data.endTimestamp); err != nil { - return err - } - return nil -} - -func newDescriptorEventData() descriptorEventData { - data := descriptorEventData{ - descriptorEventDataFixPart: descriptorEventDataFixPart{ - binlogVersion: BinlogVersion, - serverVersion: ServerVersion, - commitID: CommitID, - collectionID: -1, - partitionID: -1, - segmentID: -1, - startTimestamp: 0, - endTimestamp: 0, - payloadDataType: -1, - }, - postHeaderLengths: []uint8{16, 16, 16, 16, 16, 16}, - } - data.headerLength = int8(data.GetMemoryUsageInBytes()) - return data -} - -func newInsertEventData() insertEventData { - return insertEventData{ - startTimestamp: 0, - endTimestamp: 0, - } -} -func newDeleteEventData() deleteEventData { - return deleteEventData{ - startTimestamp: 0, - endTimestamp: 0, - } -} -func newCreateCollectionEventData() createCollectionEventData { - return createCollectionEventData{ - startTimestamp: 0, - endTimestamp: 0, - } -} -func newDropCollectionEventData() dropCollectionEventData { - return dropCollectionEventData{ - startTimestamp: 0, - endTimestamp: 0, - } -} -func newCreatePartitionEventData() createPartitionEventData { - return createPartitionEventData{ - startTimestamp: 0, - endTimestamp: 0, - } -} -func newDropPartitionEventData() dropPartitionEventData { - return dropPartitionEventData{ - startTimestamp: 0, - endTimestamp: 0, - } -} - -func readInsertEventData(buffer io.Reader) (*insertEventData, error) { - data := &insertEventData{} - if err := binary.Read(buffer, binary.LittleEndian, data); err != nil { - return nil, err - } - return data, nil -} - -func readDeleteEventData(buffer io.Reader) (*deleteEventData, error) { - data := &deleteEventData{} - if err := binary.Read(buffer, binary.LittleEndian, data); err != nil { - return nil, err - } - return data, nil -} - -func readCreateCollectionEventData(buffer io.Reader) (*createCollectionEventData, error) { - data := &createCollectionEventData{} - if err := binary.Read(buffer, binary.LittleEndian, data); err != nil { - return nil, err - } - return data, nil -} - -func readDropCollectionEventData(buffer io.Reader) (*dropCollectionEventData, error) { - data := &dropCollectionEventData{} - if err := binary.Read(buffer, binary.LittleEndian, data); err != nil { - return nil, err - } - return data, nil -} - -func readCreatePartitionEventData(buffer io.Reader) (*createPartitionEventData, error) { - data := &createPartitionEventData{} - if err := binary.Read(buffer, binary.LittleEndian, data); err != nil { - return nil, err - } - return data, nil -} - -func readDropPartitionEventData(buffer io.Reader) (*dropPartitionEventData, error) { - data := &dropPartitionEventData{} - if err := binary.Read(buffer, binary.LittleEndian, data); err != nil { - return nil, err - } - return data, nil -} diff --git a/internal/storage/event_header.go b/internal/storage/event_header.go deleted file mode 100644 index f0b64f52bf..0000000000 --- a/internal/storage/event_header.go +++ /dev/null @@ -1,80 +0,0 @@ -package storage - -import ( - "bytes" - "encoding/binary" - "io" - "time" - - "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" - - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" -) - -type baseEventHeader struct { - timestamp typeutil.Timestamp - typeCode EventTypeCode - serverID int32 - eventLength int32 - nextPosition int32 -} - -func (header *baseEventHeader) GetMemoryUsageInBytes() int32 { - buf := new(bytes.Buffer) - binary.Write(buf, binary.LittleEndian, header) - return int32(buf.Len()) -} - -func (header *baseEventHeader) TypeCode() EventTypeCode { - return header.typeCode -} - -func (header *baseEventHeader) Write(buffer io.Writer) error { - return binary.Write(buffer, binary.LittleEndian, header) -} - -type descriptorEventHeader = baseEventHeader - -type eventHeader struct { - baseEventHeader -} - -func readEventHeader(buffer io.Reader) (*eventHeader, error) { - header := &eventHeader{} - if err := binary.Read(buffer, binary.LittleEndian, header); err != nil { - return nil, err - } - - return header, nil -} - -func readDescriptorEventHeader(buffer io.Reader) (*descriptorEventHeader, error) { - header := &descriptorEventHeader{} - if err := binary.Read(buffer, binary.LittleEndian, header); err != nil { - return nil, err - } - return header, nil -} - -func newDescriptorEventHeader() descriptorEventHeader { - header := descriptorEventHeader{ - timestamp: tsoutil.ComposeTS(time.Now().UnixNano()/int64(time.Millisecond), 0), - typeCode: DescriptorEventType, - serverID: ServerID, - } - header.eventLength = header.GetMemoryUsageInBytes() - header.nextPosition = header.eventLength + 4 - return header -} - -func newEventHeader(eventTypeCode EventTypeCode) eventHeader { - return eventHeader{ - baseEventHeader: baseEventHeader{ - timestamp: tsoutil.ComposeTS(time.Now().UnixNano()/int64(time.Millisecond), 0), - typeCode: eventTypeCode, - serverID: ServerID, - eventLength: -1, - nextPosition: -1, - }, - } -} diff --git a/internal/storage/event_reader.go b/internal/storage/event_reader.go deleted file mode 100644 index b70ac63ef8..0000000000 --- a/internal/storage/event_reader.go +++ /dev/null @@ -1,99 +0,0 @@ -package storage - -import ( - "bytes" - "errors" - "strconv" - - "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" -) - -type EventReader struct { - eventHeader - eventData - PayloadReaderInterface - buffer *bytes.Buffer - isClosed bool -} - -func (reader *EventReader) checkClose() error { - if reader.isClosed { - return errors.New("event reader is closed") - } - return nil -} - -func (reader *EventReader) readHeader() (*eventHeader, error) { - if err := reader.checkClose(); err != nil { - return nil, err - } - header, err := readEventHeader(reader.buffer) - if err != nil { - return nil, err - } - reader.eventHeader = *header - return &reader.eventHeader, nil -} - -func (reader *EventReader) readData() (eventData, error) { - if err := reader.checkClose(); err != nil { - return nil, err - } - var data eventData - var err error - switch reader.TypeCode() { - case InsertEventType: - data, err = readInsertEventData(reader.buffer) - case DeleteEventType: - data, err = readDeleteEventData(reader.buffer) - case CreateCollectionEventType: - data, err = readCreateCollectionEventData(reader.buffer) - case DropCollectionEventType: - data, err = readDropCollectionEventData(reader.buffer) - case CreatePartitionEventType: - data, err = readCreatePartitionEventData(reader.buffer) - case DropPartitionEventType: - data, err = readDropPartitionEventData(reader.buffer) - default: - return nil, errors.New("unknown header type code: " + strconv.Itoa(int(reader.TypeCode()))) - } - - if err != nil { - return nil, err - } - - reader.eventData = data - return reader.eventData, nil -} - -func (reader *EventReader) Close() error { - if !reader.isClosed { - reader.isClosed = true - return reader.PayloadReaderInterface.Close() - } - return nil -} -func newEventReader(datatype schemapb.DataType, buffer *bytes.Buffer) (*EventReader, error) { - reader := &EventReader{ - eventHeader: eventHeader{ - baseEventHeader{}, - }, - buffer: buffer, - isClosed: false, - } - - if _, err := reader.readHeader(); err != nil { - return nil, err - } - - if _, err := reader.readData(); err != nil { - return nil, err - } - - payloadReader, err := NewPayloadReader(datatype, buffer.Bytes()) - if err != nil { - return nil, err - } - reader.PayloadReaderInterface = payloadReader - return reader, nil -} diff --git a/internal/storage/event_writer.go b/internal/storage/event_writer.go deleted file mode 100644 index 940c805159..0000000000 --- a/internal/storage/event_writer.go +++ /dev/null @@ -1,289 +0,0 @@ -package storage - -import ( - "bytes" - "encoding/binary" - "io" - - "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" -) - -type EventTypeCode int8 - -const ( - DescriptorEventType EventTypeCode = iota - InsertEventType - DeleteEventType - CreateCollectionEventType - DropCollectionEventType - CreatePartitionEventType - DropPartitionEventType -) - -func (code EventTypeCode) String() string { - codes := []string{"DescriptorEventType", "InsertEventType", "DeleteEventType", "CreateCollectionEventType", "DropCollectionEventType", - "CreatePartitionEventType", "DropPartitionEventType"} - if len(codes) < int(code) { - return "" - } - return codes[code] -} - -type descriptorEvent struct { - descriptorEventHeader - descriptorEventData -} - -func (event *descriptorEvent) Write(buffer io.Writer) error { - if err := event.descriptorEventHeader.Write(buffer); err != nil { - return err - } - if err := event.descriptorEventData.Write(buffer); err != nil { - return err - } - return nil -} - -func ReadDescriptorEvent(buffer io.Reader) (*descriptorEvent, error) { - header, err := readDescriptorEventHeader(buffer) - if err != nil { - return nil, err - } - data, err := readDescriptorEventData(buffer) - if err != nil { - return nil, err - } - return &descriptorEvent{ - descriptorEventHeader: *header, - descriptorEventData: *data, - }, nil -} - -type EventWriter interface { - PayloadWriterInterface - // Finish set meta in header and no data can be added to event writer - Finish() error - // Close release resources - Close() error - // Write serialize to buffer, should call Finish first - Write(buffer *bytes.Buffer) error - GetMemoryUsageInBytes() (int32, error) -} - -type baseEventWriter struct { - eventHeader - PayloadWriterInterface - isClosed bool - isFinish bool - offset int32 - getEventDataSize func() int32 - writeEventData func(buffer io.Writer) error -} - -func (writer *baseEventWriter) GetMemoryUsageInBytes() (int32, error) { - data, err := writer.GetPayloadBufferFromWriter() - if err != nil { - return -1, err - } - return writer.getEventDataSize() + writer.eventHeader.GetMemoryUsageInBytes() + - int32(len(data)), nil -} - -func (writer *baseEventWriter) Write(buffer *bytes.Buffer) error { - if err := writer.eventHeader.Write(buffer); err != nil { - return err - } - if err := writer.writeEventData(buffer); err != nil { - return err - } - - data, err := writer.GetPayloadBufferFromWriter() - if err != nil { - return err - } - if err := binary.Write(buffer, binary.LittleEndian, data); err != nil { - return err - } - return nil -} - -func (writer *baseEventWriter) Finish() error { - if !writer.isFinish { - writer.isFinish = true - if err := writer.FinishPayloadWriter(); err != nil { - return err - } - eventLength, err := writer.GetMemoryUsageInBytes() - if err != nil { - return err - } - writer.eventLength = eventLength - writer.nextPosition = eventLength + writer.offset - - } - return nil -} - -func (writer *baseEventWriter) Close() error { - if !writer.isClosed { - writer.isFinish = true - writer.isClosed = true - if err := writer.ReleasePayloadWriter(); err != nil { - return err - } - } - return nil -} - -type insertEventWriter struct { - baseEventWriter - insertEventData -} - -type deleteEventWriter struct { - baseEventWriter - deleteEventData -} - -type createCollectionEventWriter struct { - baseEventWriter - createCollectionEventData -} - -type dropCollectionEventWriter struct { - baseEventWriter - dropCollectionEventData -} - -type createPartitionEventWriter struct { - baseEventWriter - createPartitionEventData -} - -type dropPartitionEventWriter struct { - baseEventWriter - dropPartitionEventData -} - -func newDescriptorEvent() descriptorEvent { - return descriptorEvent{ - descriptorEventHeader: newDescriptorEventHeader(), - descriptorEventData: newDescriptorEventData(), - } -} - -func newInsertEventWriter(dataType schemapb.DataType, offset int32) (*insertEventWriter, error) { - payloadWriter, err := NewPayloadWriter(dataType) - if err != nil { - return nil, err - } - writer := &insertEventWriter{ - baseEventWriter: baseEventWriter{ - eventHeader: newEventHeader(InsertEventType), - PayloadWriterInterface: payloadWriter, - isClosed: false, - isFinish: false, - offset: offset, - }, - insertEventData: newInsertEventData(), - } - writer.baseEventWriter.getEventDataSize = writer.insertEventData.GetEventDataSize - writer.baseEventWriter.writeEventData = writer.insertEventData.WriteEventData - return writer, nil -} - -func newDeleteEventWriter(dataType schemapb.DataType, offset int32) (*deleteEventWriter, error) { - payloadWriter, err := NewPayloadWriter(dataType) - if err != nil { - return nil, err - } - writer := &deleteEventWriter{ - baseEventWriter: baseEventWriter{ - eventHeader: newEventHeader(DeleteEventType), - PayloadWriterInterface: payloadWriter, - isClosed: false, - isFinish: false, - offset: offset, - }, - deleteEventData: newDeleteEventData(), - } - writer.baseEventWriter.getEventDataSize = writer.deleteEventData.GetEventDataSize - writer.baseEventWriter.writeEventData = writer.deleteEventData.WriteEventData - return writer, nil -} -func newCreateCollectionEventWriter(dataType schemapb.DataType, offset int32) (*createCollectionEventWriter, error) { - payloadWriter, err := NewPayloadWriter(dataType) - if err != nil { - return nil, err - } - writer := &createCollectionEventWriter{ - baseEventWriter: baseEventWriter{ - eventHeader: newEventHeader(CreateCollectionEventType), - PayloadWriterInterface: payloadWriter, - isClosed: false, - isFinish: false, - offset: offset, - }, - createCollectionEventData: newCreateCollectionEventData(), - } - writer.baseEventWriter.getEventDataSize = writer.createCollectionEventData.GetEventDataSize - writer.baseEventWriter.writeEventData = writer.createCollectionEventData.WriteEventData - return writer, nil -} -func newDropCollectionEventWriter(dataType schemapb.DataType, offset int32) (*dropCollectionEventWriter, error) { - payloadWriter, err := NewPayloadWriter(dataType) - if err != nil { - return nil, err - } - writer := &dropCollectionEventWriter{ - baseEventWriter: baseEventWriter{ - eventHeader: newEventHeader(DropCollectionEventType), - PayloadWriterInterface: payloadWriter, - isClosed: false, - isFinish: false, - offset: offset, - }, - dropCollectionEventData: newDropCollectionEventData(), - } - writer.baseEventWriter.getEventDataSize = writer.dropCollectionEventData.GetEventDataSize - writer.baseEventWriter.writeEventData = writer.dropCollectionEventData.WriteEventData - return writer, nil -} -func newCreatePartitionEventWriter(dataType schemapb.DataType, offset int32) (*createPartitionEventWriter, error) { - payloadWriter, err := NewPayloadWriter(dataType) - if err != nil { - return nil, err - } - writer := &createPartitionEventWriter{ - baseEventWriter: baseEventWriter{ - eventHeader: newEventHeader(CreatePartitionEventType), - PayloadWriterInterface: payloadWriter, - isClosed: false, - isFinish: false, - offset: offset, - }, - createPartitionEventData: newCreatePartitionEventData(), - } - writer.baseEventWriter.getEventDataSize = writer.createPartitionEventData.GetEventDataSize - writer.baseEventWriter.writeEventData = writer.createPartitionEventData.WriteEventData - return writer, nil -} -func newDropPartitionEventWriter(dataType schemapb.DataType, offset int32) (*dropPartitionEventWriter, error) { - payloadWriter, err := NewPayloadWriter(dataType) - if err != nil { - return nil, err - } - writer := &dropPartitionEventWriter{ - baseEventWriter: baseEventWriter{ - eventHeader: newEventHeader(DropPartitionEventType), - PayloadWriterInterface: payloadWriter, - isClosed: false, - isFinish: false, - offset: offset, - }, - dropPartitionEventData: newDropPartitionEventData(), - } - writer.baseEventWriter.getEventDataSize = writer.dropPartitionEventData.GetEventDataSize - writer.baseEventWriter.writeEventData = writer.dropPartitionEventData.WriteEventData - return writer, nil -} diff --git a/internal/storage/event_writer_test.go b/internal/storage/event_writer_test.go deleted file mode 100644 index bf9685392b..0000000000 --- a/internal/storage/event_writer_test.go +++ /dev/null @@ -1,44 +0,0 @@ -package storage - -import ( - "bytes" - "testing" - - "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" - - "github.com/stretchr/testify/assert" -) - -func TestEventWriter(t *testing.T) { - insertEvent, err := newInsertEventWriter(schemapb.DataType_INT32, 0) - assert.Nil(t, err) - defer insertEvent.Close() - err = insertEvent.Close() - assert.Nil(t, err) - - insertEvent, err = newInsertEventWriter(schemapb.DataType_INT32, 0) - assert.Nil(t, err) - err = insertEvent.AddInt64ToPayload([]int64{1, 1}) - assert.NotNil(t, err) - err = insertEvent.AddInt32ToPayload([]int32{1, 2, 3}) - assert.Nil(t, err) - nums, err := insertEvent.GetPayloadLengthFromWriter() - assert.Nil(t, err) - assert.EqualValues(t, 3, nums) - err = insertEvent.Finish() - assert.Nil(t, err) - length, err := insertEvent.GetMemoryUsageInBytes() - assert.Nil(t, err) - assert.EqualValues(t, length, insertEvent.eventLength) - err = insertEvent.AddInt32ToPayload([]int32{1}) - assert.NotNil(t, err) - buffer := new(bytes.Buffer) - err = insertEvent.Write(buffer) - assert.Nil(t, err) - length, err = insertEvent.GetMemoryUsageInBytes() - assert.Nil(t, err) - assert.EqualValues(t, length, buffer.Len()) - err = insertEvent.Close() - assert.Nil(t, err) - -} diff --git a/internal/storage/payload.go b/internal/storage/payload.go index 58a93592c3..3240daa6fa 100644 --- a/internal/storage/payload.go +++ b/internal/storage/payload.go @@ -9,8 +9,6 @@ package storage */ import "C" import ( - "fmt" - "strconv" "unsafe" "github.com/zilliztech/milvus-distributed/internal/errors" @@ -18,41 +16,6 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" ) -type PayloadWriterInterface interface { - AddDataToPayload(msgs interface{}, dim ...int) error - AddBoolToPayload(msgs []bool) error - AddInt8ToPayload(msgs []int8) error - AddInt16ToPayload(msgs []int16) error - AddInt32ToPayload(msgs []int32) error - AddInt64ToPayload(msgs []int64) error - AddFloatToPayload(msgs []float32) error - AddDoubleToPayload(msgs []float64) error - AddOneStringToPayload(msgs string) error - AddBinaryVectorToPayload(binVec []byte, dim int) error - AddFloatVectorToPayload(binVec []float32, dim int) error - FinishPayloadWriter() error - GetPayloadBufferFromWriter() ([]byte, error) - GetPayloadLengthFromWriter() (int, error) - ReleasePayloadWriter() error - Close() error -} - -type PayloadReaderInterface interface { - GetDataFromPayload(idx ...int) (interface{}, int, error) - GetBoolFromPayload() ([]bool, error) - GetInt8FromPayload() ([]int8, error) - GetInt16FromPayload() ([]int16, error) - GetInt32FromPayload() ([]int32, error) - GetInt64FromPayload() ([]int64, error) - GetFloatFromPayload() ([]float32, error) - GetDoubleFromPayload() ([]float64, error) - GetOneStringFromPayload(idx int) (string, error) - GetBinaryVectorFromPayload() ([]byte, int, error) - GetFloatVectorFromPayload() ([]float32, int, error) - GetPayloadLengthFromReader() (int, error) - ReleasePayloadReader() error - Close() error -} type ( PayloadWriter struct { payloadWriterPtr C.CPayloadWriter @@ -382,7 +345,6 @@ func (w *PayloadWriter) GetPayloadBufferFromWriter() ([]byte, error) { cb := C.GetPayloadBufferFromWriter(w.payloadWriterPtr) pointer := unsafe.Pointer(cb.data) length := int(cb.length) - fmt.Print("payload length: " + strconv.Itoa(length)) if length <= 0 { return nil, errors.New("empty buffer") }