mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 01:28:27 +08:00
Fix binlog reader bug
Signed-off-by: sunby <bingyi.sun@zilliz.com>
This commit is contained in:
parent
d09ad77fb2
commit
ed54d3e26c
@ -38,7 +38,7 @@ static auto map = [] {
|
|||||||
MetricType
|
MetricType
|
||||||
GetMetricType(const std::string& type_name) {
|
GetMetricType(const std::string& type_name) {
|
||||||
auto real_name = to_lower_copy(type_name);
|
auto real_name = to_lower_copy(type_name);
|
||||||
AssertInfo(map.left.count(real_name), "metric type not found: " + type_name);
|
AssertInfo(map.left.count(real_name), "metric type not found: (" + type_name + ")");
|
||||||
return map.left.at(real_name);
|
return map.left.at(real_name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -13,6 +13,8 @@
|
|||||||
#include "utils/Types.h"
|
#include "utils/Types.h"
|
||||||
#include <faiss/MetricType.h>
|
#include <faiss/MetricType.h>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include <boost/align/aligned_allocator.hpp>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
namespace milvus {
|
namespace milvus {
|
||||||
using Timestamp = uint64_t; // TODO: use TiKV-like timestamp
|
using Timestamp = uint64_t; // TODO: use TiKV-like timestamp
|
||||||
@ -24,4 +26,15 @@ using MetricType = faiss::MetricType;
|
|||||||
faiss::MetricType
|
faiss::MetricType
|
||||||
GetMetricType(const std::string& type);
|
GetMetricType(const std::string& type);
|
||||||
|
|
||||||
|
// NOTE: dependent type
|
||||||
|
// used at meta-template programming
|
||||||
|
template <class...>
|
||||||
|
constexpr std::true_type always_true{};
|
||||||
|
|
||||||
|
template <class...>
|
||||||
|
constexpr std::false_type always_false{};
|
||||||
|
|
||||||
|
template <typename T>
|
||||||
|
using aligned_vector = std::vector<T, boost::alignment::aligned_allocator<T, 512>>;
|
||||||
|
|
||||||
} // namespace milvus
|
} // namespace milvus
|
||||||
|
|||||||
@ -70,8 +70,6 @@ to_lower(const std::string& raw) {
|
|||||||
return data;
|
return data;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <class...>
|
|
||||||
constexpr std::false_type always_false{};
|
|
||||||
template <typename T>
|
template <typename T>
|
||||||
std::unique_ptr<Expr>
|
std::unique_ptr<Expr>
|
||||||
ParseRangeNodeImpl(const Schema& schema, const std::string& field_name, const Json& body) {
|
ParseRangeNodeImpl(const Schema& schema, const std::string& field_name, const Json& body) {
|
||||||
@ -85,31 +83,62 @@ ParseRangeNodeImpl(const Schema& schema, const std::string& field_name, const Js
|
|||||||
|
|
||||||
AssertInfo(RangeExpr::mapping_.count(op_name), "op(" + op_name + ") not found");
|
AssertInfo(RangeExpr::mapping_.count(op_name), "op(" + op_name + ") not found");
|
||||||
auto op = RangeExpr::mapping_.at(op_name);
|
auto op = RangeExpr::mapping_.at(op_name);
|
||||||
if constexpr (std::is_integral_v<T>) {
|
if constexpr (std::is_same_v<T, bool>) {
|
||||||
|
Assert(item.value().is_boolean());
|
||||||
|
} else if constexpr (std::is_integral_v<T>) {
|
||||||
Assert(item.value().is_number_integer());
|
Assert(item.value().is_number_integer());
|
||||||
} else if constexpr (std::is_floating_point_v<T>) {
|
} else if constexpr (std::is_floating_point_v<T>) {
|
||||||
Assert(item.value().is_number());
|
Assert(item.value().is_number());
|
||||||
} else {
|
} else {
|
||||||
static_assert(always_false<T>, "unsupported type");
|
static_assert(always_false<T>, "unsupported type");
|
||||||
|
__builtin_unreachable();
|
||||||
}
|
}
|
||||||
T value = item.value();
|
T value = item.value();
|
||||||
expr->conditions_.emplace_back(op, value);
|
expr->conditions_.emplace_back(op, value);
|
||||||
}
|
}
|
||||||
|
std::sort(expr->conditions_.begin(), expr->conditions_.end());
|
||||||
|
return expr;
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename T>
|
||||||
|
std::unique_ptr<Expr>
|
||||||
|
ParseTermNodeImpl(const Schema& schema, const std::string& field_name, const Json& body) {
|
||||||
|
auto expr = std::make_unique<TermExprImpl<T>>();
|
||||||
|
auto data_type = schema[field_name].get_data_type();
|
||||||
|
Assert(body.is_array());
|
||||||
|
expr->field_id_ = field_name;
|
||||||
|
expr->data_type_ = data_type;
|
||||||
|
for (auto& value : body) {
|
||||||
|
if constexpr (std::is_same_v<T, bool>) {
|
||||||
|
Assert(value.is_boolean());
|
||||||
|
} else if constexpr (std::is_integral_v<T>) {
|
||||||
|
Assert(value.is_number_integer());
|
||||||
|
} else if constexpr (std::is_floating_point_v<T>) {
|
||||||
|
Assert(value.is_number());
|
||||||
|
} else {
|
||||||
|
static_assert(always_false<T>, "unsupported type");
|
||||||
|
__builtin_unreachable();
|
||||||
|
}
|
||||||
|
T real_value = value;
|
||||||
|
expr->terms_.push_back(real_value);
|
||||||
|
}
|
||||||
|
std::sort(expr->terms_.begin(), expr->terms_.end());
|
||||||
return expr;
|
return expr;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<Expr>
|
std::unique_ptr<Expr>
|
||||||
ParseRangeNode(const Schema& schema, const Json& out_body) {
|
ParseRangeNode(const Schema& schema, const Json& out_body) {
|
||||||
|
Assert(out_body.is_object());
|
||||||
Assert(out_body.size() == 1);
|
Assert(out_body.size() == 1);
|
||||||
auto out_iter = out_body.begin();
|
auto out_iter = out_body.begin();
|
||||||
auto field_name = out_iter.key();
|
auto field_name = out_iter.key();
|
||||||
auto body = out_iter.value();
|
auto body = out_iter.value();
|
||||||
auto data_type = schema[field_name].get_data_type();
|
auto data_type = schema[field_name].get_data_type();
|
||||||
Assert(!field_is_vector(data_type));
|
Assert(!field_is_vector(data_type));
|
||||||
|
|
||||||
switch (data_type) {
|
switch (data_type) {
|
||||||
case DataType::BOOL: {
|
case DataType::BOOL: {
|
||||||
PanicInfo("bool is not supported in Range node");
|
return ParseRangeNodeImpl<bool>(schema, field_name, body);
|
||||||
// return ParseRangeNodeImpl<bool>(schema, field_name, body);
|
|
||||||
}
|
}
|
||||||
case DataType::INT8:
|
case DataType::INT8:
|
||||||
return ParseRangeNodeImpl<int8_t>(schema, field_name, body);
|
return ParseRangeNodeImpl<int8_t>(schema, field_name, body);
|
||||||
@ -128,6 +157,42 @@ ParseRangeNode(const Schema& schema, const Json& out_body) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static std::unique_ptr<Expr>
|
||||||
|
ParseTermNode(const Schema& schema, const Json& out_body) {
|
||||||
|
Assert(out_body.size() == 1);
|
||||||
|
auto out_iter = out_body.begin();
|
||||||
|
auto field_name = out_iter.key();
|
||||||
|
auto body = out_iter.value();
|
||||||
|
auto data_type = schema[field_name].get_data_type();
|
||||||
|
Assert(!field_is_vector(data_type));
|
||||||
|
switch (data_type) {
|
||||||
|
case DataType::BOOL: {
|
||||||
|
return ParseTermNodeImpl<bool>(schema, field_name, body);
|
||||||
|
}
|
||||||
|
case DataType::INT8: {
|
||||||
|
return ParseTermNodeImpl<int8_t>(schema, field_name, body);
|
||||||
|
}
|
||||||
|
case DataType::INT16: {
|
||||||
|
return ParseTermNodeImpl<int16_t>(schema, field_name, body);
|
||||||
|
}
|
||||||
|
case DataType::INT32: {
|
||||||
|
return ParseTermNodeImpl<int32_t>(schema, field_name, body);
|
||||||
|
}
|
||||||
|
case DataType::INT64: {
|
||||||
|
return ParseTermNodeImpl<int64_t>(schema, field_name, body);
|
||||||
|
}
|
||||||
|
case DataType::FLOAT: {
|
||||||
|
return ParseTermNodeImpl<float>(schema, field_name, body);
|
||||||
|
}
|
||||||
|
case DataType::DOUBLE: {
|
||||||
|
return ParseTermNodeImpl<double>(schema, field_name, body);
|
||||||
|
}
|
||||||
|
default: {
|
||||||
|
PanicInfo("unsupported data_type");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static std::unique_ptr<Plan>
|
static std::unique_ptr<Plan>
|
||||||
CreatePlanImplNaive(const Schema& schema, const std::string& dsl_str) {
|
CreatePlanImplNaive(const Schema& schema, const std::string& dsl_str) {
|
||||||
auto plan = std::make_unique<Plan>(schema);
|
auto plan = std::make_unique<Plan>(schema);
|
||||||
@ -143,6 +208,10 @@ CreatePlanImplNaive(const Schema& schema, const std::string& dsl_str) {
|
|||||||
if (pack.contains("vector")) {
|
if (pack.contains("vector")) {
|
||||||
auto& out_body = pack.at("vector");
|
auto& out_body = pack.at("vector");
|
||||||
plan->plan_node_ = ParseVecNode(plan.get(), out_body);
|
plan->plan_node_ = ParseVecNode(plan.get(), out_body);
|
||||||
|
} else if (pack.contains("term")) {
|
||||||
|
AssertInfo(!predicate, "unsupported complex DSL");
|
||||||
|
auto& out_body = pack.at("term");
|
||||||
|
predicate = ParseTermNode(schema, out_body);
|
||||||
} else if (pack.contains("range")) {
|
} else if (pack.contains("range")) {
|
||||||
AssertInfo(!predicate, "unsupported complex DSL");
|
AssertInfo(!predicate, "unsupported complex DSL");
|
||||||
auto& out_body = pack.at("range");
|
auto& out_body = pack.at("range");
|
||||||
|
|||||||
@ -20,7 +20,6 @@
|
|||||||
#include <map>
|
#include <map>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include <boost/align/aligned_allocator.hpp>
|
|
||||||
|
|
||||||
namespace milvus::query {
|
namespace milvus::query {
|
||||||
using Json = nlohmann::json;
|
using Json = nlohmann::json;
|
||||||
@ -39,9 +38,6 @@ struct Plan {
|
|||||||
// TODO: add move extra info
|
// TODO: add move extra info
|
||||||
};
|
};
|
||||||
|
|
||||||
template <typename T>
|
|
||||||
using aligned_vector = std::vector<T, boost::alignment::aligned_allocator<T, 512>>;
|
|
||||||
|
|
||||||
struct Placeholder {
|
struct Placeholder {
|
||||||
// milvus::proto::service::PlaceholderGroup group_;
|
// milvus::proto::service::PlaceholderGroup group_;
|
||||||
std::string tag_;
|
std::string tag_;
|
||||||
|
|||||||
@ -27,7 +27,7 @@ create_bitmap_view(std::optional<const BitmapSimple*> bitmaps_opt, int64_t chunk
|
|||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
auto& bitmaps = *bitmaps_opt.value();
|
auto& bitmaps = *bitmaps_opt.value();
|
||||||
auto& src_vec = bitmaps.at(chunk_id);
|
auto src_vec = ~bitmaps.at(chunk_id);
|
||||||
auto dst = std::make_shared<faiss::ConcurrentBitset>(src_vec.size());
|
auto dst = std::make_shared<faiss::ConcurrentBitset>(src_vec.size());
|
||||||
auto iter = reinterpret_cast<BitmapChunk::block_type*>(dst->mutable_data());
|
auto iter = reinterpret_cast<BitmapChunk::block_type*>(dst->mutable_data());
|
||||||
|
|
||||||
|
|||||||
@ -58,6 +58,10 @@ class ExecExprVisitor : ExprVisitor {
|
|||||||
auto
|
auto
|
||||||
ExecRangeVisitorDispatcher(RangeExpr& expr_raw) -> RetType;
|
ExecRangeVisitorDispatcher(RangeExpr& expr_raw) -> RetType;
|
||||||
|
|
||||||
|
template <typename T>
|
||||||
|
auto
|
||||||
|
ExecTermVisitorImpl(TermExpr& expr_raw) -> RetType;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
segcore::SegmentSmallIndex& segment_;
|
segcore::SegmentSmallIndex& segment_;
|
||||||
std::optional<RetType> ret_;
|
std::optional<RetType> ret_;
|
||||||
|
|||||||
@ -46,6 +46,10 @@ class ExecExprVisitor : ExprVisitor {
|
|||||||
auto
|
auto
|
||||||
ExecRangeVisitorDispatcher(RangeExpr& expr_raw) -> RetType;
|
ExecRangeVisitorDispatcher(RangeExpr& expr_raw) -> RetType;
|
||||||
|
|
||||||
|
template <typename T>
|
||||||
|
auto
|
||||||
|
ExecTermVisitorImpl(TermExpr& expr_raw) -> RetType;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
segcore::SegmentSmallIndex& segment_;
|
segcore::SegmentSmallIndex& segment_;
|
||||||
std::optional<RetType> ret_;
|
std::optional<RetType> ret_;
|
||||||
@ -63,11 +67,6 @@ ExecExprVisitor::visit(BoolBinaryExpr& expr) {
|
|||||||
PanicInfo("unimplemented");
|
PanicInfo("unimplemented");
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
|
||||||
ExecExprVisitor::visit(TermExpr& expr) {
|
|
||||||
PanicInfo("unimplemented");
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename T, typename IndexFunc, typename ElementFunc>
|
template <typename T, typename IndexFunc, typename ElementFunc>
|
||||||
auto
|
auto
|
||||||
ExecExprVisitor::ExecRangeVisitorImpl(RangeExprImpl<T>& expr, IndexFunc index_func, ElementFunc element_func)
|
ExecExprVisitor::ExecRangeVisitorImpl(RangeExprImpl<T>& expr, IndexFunc index_func, ElementFunc element_func)
|
||||||
@ -84,17 +83,17 @@ ExecExprVisitor::ExecRangeVisitorImpl(RangeExprImpl<T>& expr, IndexFunc index_fu
|
|||||||
auto& indexing_record = segment_.get_indexing_record();
|
auto& indexing_record = segment_.get_indexing_record();
|
||||||
const segcore::ScalarIndexingEntry<T>& entry = indexing_record.get_scalar_entry<T>(field_offset);
|
const segcore::ScalarIndexingEntry<T>& entry = indexing_record.get_scalar_entry<T>(field_offset);
|
||||||
|
|
||||||
RetType results(vec.chunk_size());
|
RetType results(vec.num_chunk());
|
||||||
auto indexing_barrier = indexing_record.get_finished_ack();
|
auto indexing_barrier = indexing_record.get_finished_ack();
|
||||||
for (auto chunk_id = 0; chunk_id < indexing_barrier; ++chunk_id) {
|
for (auto chunk_id = 0; chunk_id < indexing_barrier; ++chunk_id) {
|
||||||
auto& result = results[chunk_id];
|
auto& result = results[chunk_id];
|
||||||
auto indexing = entry.get_indexing(chunk_id);
|
auto indexing = entry.get_indexing(chunk_id);
|
||||||
auto data = index_func(indexing);
|
auto data = index_func(indexing);
|
||||||
result = ~std::move(*data);
|
result = std::move(*data);
|
||||||
Assert(result.size() == segcore::DefaultElementPerChunk);
|
Assert(result.size() == segcore::DefaultElementPerChunk);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (auto chunk_id = indexing_barrier; chunk_id < vec.chunk_size(); ++chunk_id) {
|
for (auto chunk_id = indexing_barrier; chunk_id < vec.num_chunk(); ++chunk_id) {
|
||||||
auto& result = results[chunk_id];
|
auto& result = results[chunk_id];
|
||||||
result.resize(segcore::DefaultElementPerChunk);
|
result.resize(segcore::DefaultElementPerChunk);
|
||||||
auto chunk = vec.get_chunk(chunk_id);
|
auto chunk = vec.get_chunk(chunk_id);
|
||||||
@ -126,32 +125,32 @@ ExecExprVisitor::ExecRangeVisitorDispatcher(RangeExpr& expr_raw) -> RetType {
|
|||||||
switch (op) {
|
switch (op) {
|
||||||
case OpType::Equal: {
|
case OpType::Equal: {
|
||||||
auto index_func = [val](Index* index) { return index->In(1, &val); };
|
auto index_func = [val](Index* index) { return index->In(1, &val); };
|
||||||
return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return !(x == val); });
|
return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return (x == val); });
|
||||||
}
|
}
|
||||||
|
|
||||||
case OpType::NotEqual: {
|
case OpType::NotEqual: {
|
||||||
auto index_func = [val](Index* index) { return index->NotIn(1, &val); };
|
auto index_func = [val](Index* index) { return index->NotIn(1, &val); };
|
||||||
return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return !(x != val); });
|
return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return (x != val); });
|
||||||
}
|
}
|
||||||
|
|
||||||
case OpType::GreaterEqual: {
|
case OpType::GreaterEqual: {
|
||||||
auto index_func = [val](Index* index) { return index->Range(val, Operator::GE); };
|
auto index_func = [val](Index* index) { return index->Range(val, Operator::GE); };
|
||||||
return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return !(x >= val); });
|
return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return (x >= val); });
|
||||||
}
|
}
|
||||||
|
|
||||||
case OpType::GreaterThan: {
|
case OpType::GreaterThan: {
|
||||||
auto index_func = [val](Index* index) { return index->Range(val, Operator::GT); };
|
auto index_func = [val](Index* index) { return index->Range(val, Operator::GT); };
|
||||||
return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return !(x > val); });
|
return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return (x > val); });
|
||||||
}
|
}
|
||||||
|
|
||||||
case OpType::LessEqual: {
|
case OpType::LessEqual: {
|
||||||
auto index_func = [val](Index* index) { return index->Range(val, Operator::LE); };
|
auto index_func = [val](Index* index) { return index->Range(val, Operator::LE); };
|
||||||
return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return !(x <= val); });
|
return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return (x <= val); });
|
||||||
}
|
}
|
||||||
|
|
||||||
case OpType::LessThan: {
|
case OpType::LessThan: {
|
||||||
auto index_func = [val](Index* index) { return index->Range(val, Operator::LT); };
|
auto index_func = [val](Index* index) { return index->Range(val, Operator::LT); };
|
||||||
return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return !(x < val); });
|
return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return (x < val); });
|
||||||
}
|
}
|
||||||
default: {
|
default: {
|
||||||
PanicInfo("unsupported range node");
|
PanicInfo("unsupported range node");
|
||||||
@ -167,16 +166,16 @@ ExecExprVisitor::ExecRangeVisitorDispatcher(RangeExpr& expr_raw) -> RetType {
|
|||||||
if (false) {
|
if (false) {
|
||||||
} else if (ops == std::make_tuple(OpType::GreaterThan, OpType::LessThan)) {
|
} else if (ops == std::make_tuple(OpType::GreaterThan, OpType::LessThan)) {
|
||||||
auto index_func = [val1, val2](Index* index) { return index->Range(val1, false, val2, false); };
|
auto index_func = [val1, val2](Index* index) { return index->Range(val1, false, val2, false); };
|
||||||
return ExecRangeVisitorImpl(expr, index_func, [val1, val2](T x) { return !(val1 < x && x < val2); });
|
return ExecRangeVisitorImpl(expr, index_func, [val1, val2](T x) { return (val1 < x && x < val2); });
|
||||||
} else if (ops == std::make_tuple(OpType::GreaterThan, OpType::LessEqual)) {
|
} else if (ops == std::make_tuple(OpType::GreaterThan, OpType::LessEqual)) {
|
||||||
auto index_func = [val1, val2](Index* index) { return index->Range(val1, false, val2, true); };
|
auto index_func = [val1, val2](Index* index) { return index->Range(val1, false, val2, true); };
|
||||||
return ExecRangeVisitorImpl(expr, index_func, [val1, val2](T x) { return !(val1 < x && x <= val2); });
|
return ExecRangeVisitorImpl(expr, index_func, [val1, val2](T x) { return (val1 < x && x <= val2); });
|
||||||
} else if (ops == std::make_tuple(OpType::GreaterEqual, OpType::LessThan)) {
|
} else if (ops == std::make_tuple(OpType::GreaterEqual, OpType::LessThan)) {
|
||||||
auto index_func = [val1, val2](Index* index) { return index->Range(val1, true, val2, false); };
|
auto index_func = [val1, val2](Index* index) { return index->Range(val1, true, val2, false); };
|
||||||
return ExecRangeVisitorImpl(expr, index_func, [val1, val2](T x) { return !(val1 <= x && x < val2); });
|
return ExecRangeVisitorImpl(expr, index_func, [val1, val2](T x) { return (val1 <= x && x < val2); });
|
||||||
} else if (ops == std::make_tuple(OpType::GreaterEqual, OpType::LessEqual)) {
|
} else if (ops == std::make_tuple(OpType::GreaterEqual, OpType::LessEqual)) {
|
||||||
auto index_func = [val1, val2](Index* index) { return index->Range(val1, true, val2, true); };
|
auto index_func = [val1, val2](Index* index) { return index->Range(val1, true, val2, true); };
|
||||||
return ExecRangeVisitorImpl(expr, index_func, [val1, val2](T x) { return !(val1 <= x && x <= val2); });
|
return ExecRangeVisitorImpl(expr, index_func, [val1, val2](T x) { return (val1 <= x && x <= val2); });
|
||||||
} else {
|
} else {
|
||||||
PanicInfo("unsupported range node");
|
PanicInfo("unsupported range node");
|
||||||
}
|
}
|
||||||
@ -226,4 +225,79 @@ ExecExprVisitor::visit(RangeExpr& expr) {
|
|||||||
ret_ = std::move(ret);
|
ret_ = std::move(ret);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <typename T>
|
||||||
|
auto
|
||||||
|
ExecExprVisitor::ExecTermVisitorImpl(TermExpr& expr_raw) -> RetType {
|
||||||
|
auto& expr = static_cast<TermExprImpl<T>&>(expr_raw);
|
||||||
|
auto& records = segment_.get_insert_record();
|
||||||
|
auto data_type = expr.data_type_;
|
||||||
|
auto& schema = segment_.get_schema();
|
||||||
|
auto field_offset_opt = schema.get_offset(expr.field_id_);
|
||||||
|
Assert(field_offset_opt);
|
||||||
|
auto field_offset = field_offset_opt.value();
|
||||||
|
auto& field_meta = schema[field_offset];
|
||||||
|
auto vec_ptr = records.get_entity<T>(field_offset);
|
||||||
|
auto& vec = *vec_ptr;
|
||||||
|
auto num_chunk = vec.num_chunk();
|
||||||
|
RetType bitsets;
|
||||||
|
|
||||||
|
auto N = records.ack_responder_.GetAck();
|
||||||
|
|
||||||
|
// small batch
|
||||||
|
for (int64_t chunk_id = 0; chunk_id < num_chunk; ++chunk_id) {
|
||||||
|
auto& chunk = vec.get_chunk(chunk_id);
|
||||||
|
|
||||||
|
auto size = chunk_id == num_chunk - 1 ? N - chunk_id * segcore::DefaultElementPerChunk
|
||||||
|
: segcore::DefaultElementPerChunk;
|
||||||
|
|
||||||
|
boost::dynamic_bitset<> bitset(segcore::DefaultElementPerChunk);
|
||||||
|
for (int i = 0; i < size; ++i) {
|
||||||
|
auto value = chunk[i];
|
||||||
|
bool is_in = std::binary_search(expr.terms_.begin(), expr.terms_.end(), value);
|
||||||
|
bitset[i] = is_in;
|
||||||
|
}
|
||||||
|
bitsets.emplace_back(std::move(bitset));
|
||||||
|
}
|
||||||
|
return bitsets;
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
ExecExprVisitor::visit(TermExpr& expr) {
|
||||||
|
auto& field_meta = segment_.get_schema()[expr.field_id_];
|
||||||
|
Assert(expr.data_type_ == field_meta.get_data_type());
|
||||||
|
RetType ret;
|
||||||
|
switch (expr.data_type_) {
|
||||||
|
case DataType::BOOL: {
|
||||||
|
ret = ExecTermVisitorImpl<bool>(expr);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case DataType::INT8: {
|
||||||
|
ret = ExecTermVisitorImpl<int8_t>(expr);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case DataType::INT16: {
|
||||||
|
ret = ExecTermVisitorImpl<int16_t>(expr);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case DataType::INT32: {
|
||||||
|
ret = ExecTermVisitorImpl<int32_t>(expr);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case DataType::INT64: {
|
||||||
|
ret = ExecTermVisitorImpl<int64_t>(expr);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case DataType::FLOAT: {
|
||||||
|
ret = ExecTermVisitorImpl<float>(expr);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case DataType::DOUBLE: {
|
||||||
|
ret = ExecTermVisitorImpl<double>(expr);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
PanicInfo("unsupported");
|
||||||
|
}
|
||||||
|
ret_ = std::move(ret);
|
||||||
|
}
|
||||||
} // namespace milvus::query
|
} // namespace milvus::query
|
||||||
|
|||||||
@ -196,7 +196,7 @@ class ConcurrentVectorImpl : public VectorBase {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ssize_t
|
ssize_t
|
||||||
chunk_size() const {
|
num_chunk() const {
|
||||||
return chunks_.size();
|
return chunks_.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -24,7 +24,7 @@ VecIndexingEntry::BuildIndexRange(int64_t ack_beg, int64_t ack_end, const Vector
|
|||||||
|
|
||||||
auto source = dynamic_cast<const ConcurrentVector<FloatVector>*>(vec_base);
|
auto source = dynamic_cast<const ConcurrentVector<FloatVector>*>(vec_base);
|
||||||
Assert(source);
|
Assert(source);
|
||||||
auto chunk_size = source->chunk_size();
|
auto chunk_size = source->num_chunk();
|
||||||
assert(ack_end <= chunk_size);
|
assert(ack_end <= chunk_size);
|
||||||
auto conf = get_build_conf();
|
auto conf = get_build_conf();
|
||||||
data_.grow_to_at_least(ack_end);
|
data_.grow_to_at_least(ack_end);
|
||||||
@ -87,7 +87,7 @@ void
|
|||||||
ScalarIndexingEntry<T>::BuildIndexRange(int64_t ack_beg, int64_t ack_end, const VectorBase* vec_base) {
|
ScalarIndexingEntry<T>::BuildIndexRange(int64_t ack_beg, int64_t ack_end, const VectorBase* vec_base) {
|
||||||
auto source = dynamic_cast<const ConcurrentVector<T>*>(vec_base);
|
auto source = dynamic_cast<const ConcurrentVector<T>*>(vec_base);
|
||||||
Assert(source);
|
Assert(source);
|
||||||
auto chunk_size = source->chunk_size();
|
auto chunk_size = source->num_chunk();
|
||||||
assert(ack_end <= chunk_size);
|
assert(ack_end <= chunk_size);
|
||||||
data_.grow_to_at_least(ack_end);
|
data_.grow_to_at_least(ack_end);
|
||||||
for (int chunk_id = ack_beg; chunk_id < ack_end; chunk_id++) {
|
for (int chunk_id = ack_beg; chunk_id < ack_end; chunk_id++) {
|
||||||
|
|||||||
@ -467,16 +467,16 @@ SegmentNaive::BuildVecIndexImpl(const IndexMeta::Entry& entry) {
|
|||||||
auto dim = field.get_dim();
|
auto dim = field.get_dim();
|
||||||
|
|
||||||
auto indexing = knowhere::VecIndexFactory::GetInstance().CreateVecIndex(entry.type, entry.mode);
|
auto indexing = knowhere::VecIndexFactory::GetInstance().CreateVecIndex(entry.type, entry.mode);
|
||||||
auto chunk_size = record_.uids_.chunk_size();
|
auto chunk_size = record_.uids_.num_chunk();
|
||||||
|
|
||||||
auto& uids = record_.uids_;
|
auto& uids = record_.uids_;
|
||||||
auto entities = record_.get_entity<FloatVector>(offset);
|
auto entities = record_.get_entity<FloatVector>(offset);
|
||||||
|
|
||||||
std::vector<knowhere::DatasetPtr> datasets;
|
std::vector<knowhere::DatasetPtr> datasets;
|
||||||
for (int chunk_id = 0; chunk_id < uids.chunk_size(); ++chunk_id) {
|
for (int chunk_id = 0; chunk_id < uids.num_chunk(); ++chunk_id) {
|
||||||
auto entities_chunk = entities->get_chunk(chunk_id).data();
|
auto entities_chunk = entities->get_chunk(chunk_id).data();
|
||||||
int64_t count = chunk_id == uids.chunk_size() - 1 ? record_.reserved - chunk_id * DefaultElementPerChunk
|
int64_t count = chunk_id == uids.num_chunk() - 1 ? record_.reserved - chunk_id * DefaultElementPerChunk
|
||||||
: DefaultElementPerChunk;
|
: DefaultElementPerChunk;
|
||||||
datasets.push_back(knowhere::GenDataset(count, dim, entities_chunk));
|
datasets.push_back(knowhere::GenDataset(count, dim, entities_chunk));
|
||||||
}
|
}
|
||||||
for (auto& ds : datasets) {
|
for (auto& ds : datasets) {
|
||||||
|
|||||||
@ -241,10 +241,10 @@ SegmentSmallIndex::BuildVecIndexImpl(const IndexMeta::Entry& entry) {
|
|||||||
auto entities = record_.get_entity<FloatVector>(offset);
|
auto entities = record_.get_entity<FloatVector>(offset);
|
||||||
|
|
||||||
std::vector<knowhere::DatasetPtr> datasets;
|
std::vector<knowhere::DatasetPtr> datasets;
|
||||||
for (int chunk_id = 0; chunk_id < uids.chunk_size(); ++chunk_id) {
|
for (int chunk_id = 0; chunk_id < uids.num_chunk(); ++chunk_id) {
|
||||||
auto entities_chunk = entities->get_chunk(chunk_id).data();
|
auto entities_chunk = entities->get_chunk(chunk_id).data();
|
||||||
int64_t count = chunk_id == uids.chunk_size() - 1 ? record_.reserved - chunk_id * DefaultElementPerChunk
|
int64_t count = chunk_id == uids.num_chunk() - 1 ? record_.reserved - chunk_id * DefaultElementPerChunk
|
||||||
: DefaultElementPerChunk;
|
: DefaultElementPerChunk;
|
||||||
datasets.push_back(knowhere::GenDataset(count, dim, entities_chunk));
|
datasets.push_back(knowhere::GenDataset(count, dim, entities_chunk));
|
||||||
}
|
}
|
||||||
for (auto& ds : datasets) {
|
for (auto& ds : datasets) {
|
||||||
|
|||||||
@ -26,4 +26,5 @@ target_link_libraries(all_tests
|
|||||||
pthread
|
pthread
|
||||||
milvus_utils
|
milvus_utils
|
||||||
)
|
)
|
||||||
|
|
||||||
install (TARGETS all_tests DESTINATION unittest)
|
install (TARGETS all_tests DESTINATION unittest)
|
||||||
|
|||||||
12
internal/core/unittest/test_common.cpp
Normal file
12
internal/core/unittest/test_common.cpp
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
// with the License. You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||||
|
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||||
|
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||||
|
|
||||||
|
#include <gtest/gtest.h>
|
||||||
@ -52,7 +52,7 @@ TEST(ConcurrentVector, TestSingle) {
|
|||||||
c_vec.set_data(total_count, vec.data(), insert_size);
|
c_vec.set_data(total_count, vec.data(), insert_size);
|
||||||
total_count += insert_size;
|
total_count += insert_size;
|
||||||
}
|
}
|
||||||
ASSERT_EQ(c_vec.chunk_size(), (total_count + 31) / 32);
|
ASSERT_EQ(c_vec.num_chunk(), (total_count + 31) / 32);
|
||||||
for (int i = 0; i < total_count; ++i) {
|
for (int i = 0; i < total_count; ++i) {
|
||||||
for (int d = 0; d < dim; ++d) {
|
for (int d = 0; d < dim; ++d) {
|
||||||
auto std_data = d + i * dim;
|
auto std_data = d + i * dim;
|
||||||
|
|||||||
@ -321,7 +321,88 @@ TEST(Expr, TestRange) {
|
|||||||
auto ans = final[vec_id][offset];
|
auto ans = final[vec_id][offset];
|
||||||
|
|
||||||
auto val = age_col[i];
|
auto val = age_col[i];
|
||||||
auto ref = !ref_func(val);
|
auto ref = ref_func(val);
|
||||||
|
ASSERT_EQ(ans, ref) << clause << "@" << i << "!!" << val;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(Expr, TestTerm) {
|
||||||
|
using namespace milvus::query;
|
||||||
|
using namespace milvus::segcore;
|
||||||
|
auto vec_2k_3k = [] {
|
||||||
|
std::string buf = "[";
|
||||||
|
for (int i = 2000; i < 3000 - 1; ++i) {
|
||||||
|
buf += std::to_string(i) + ", ";
|
||||||
|
}
|
||||||
|
buf += std::to_string(2999) + "]";
|
||||||
|
return buf;
|
||||||
|
}();
|
||||||
|
|
||||||
|
std::vector<std::tuple<std::string, std::function<bool(int)>>> testcases = {
|
||||||
|
{R"([2000, 3000])", [](int v) { return v == 2000 || v == 3000; }},
|
||||||
|
{R"([2000])", [](int v) { return v == 2000; }},
|
||||||
|
{R"([3000])", [](int v) { return v == 3000; }},
|
||||||
|
{vec_2k_3k, [](int v) { return 2000 <= v && v < 3000; }},
|
||||||
|
};
|
||||||
|
|
||||||
|
std::string dsl_string_tmp = R"(
|
||||||
|
{
|
||||||
|
"bool": {
|
||||||
|
"must": [
|
||||||
|
{
|
||||||
|
"term": {
|
||||||
|
"age": @@@@
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"vector": {
|
||||||
|
"fakevec": {
|
||||||
|
"metric_type": "L2",
|
||||||
|
"params": {
|
||||||
|
"nprobe": 10
|
||||||
|
},
|
||||||
|
"query": "$0",
|
||||||
|
"topk": 10
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
})";
|
||||||
|
auto schema = std::make_shared<Schema>();
|
||||||
|
schema->AddField("fakevec", DataType::VECTOR_FLOAT, 16, MetricType::METRIC_L2);
|
||||||
|
schema->AddField("age", DataType::INT32);
|
||||||
|
|
||||||
|
auto seg = CreateSegment(schema);
|
||||||
|
int N = 10000;
|
||||||
|
std::vector<int> age_col;
|
||||||
|
int num_iters = 100;
|
||||||
|
for (int iter = 0; iter < num_iters; ++iter) {
|
||||||
|
auto raw_data = DataGen(schema, N, iter);
|
||||||
|
auto new_age_col = raw_data.get_col<int>(1);
|
||||||
|
age_col.insert(age_col.end(), new_age_col.begin(), new_age_col.end());
|
||||||
|
seg->PreInsert(N);
|
||||||
|
seg->Insert(iter * N, N, raw_data.row_ids_.data(), raw_data.timestamps_.data(), raw_data.raw_);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto seg_promote = dynamic_cast<SegmentSmallIndex*>(seg.get());
|
||||||
|
ExecExprVisitor visitor(*seg_promote);
|
||||||
|
for (auto [clause, ref_func] : testcases) {
|
||||||
|
auto loc = dsl_string_tmp.find("@@@@");
|
||||||
|
auto dsl_string = dsl_string_tmp;
|
||||||
|
dsl_string.replace(loc, 4, clause);
|
||||||
|
auto plan = CreatePlan(*schema, dsl_string);
|
||||||
|
auto final = visitor.call_child(*plan->plan_node_->predicate_.value());
|
||||||
|
EXPECT_EQ(final.size(), upper_div(N * num_iters, DefaultElementPerChunk));
|
||||||
|
|
||||||
|
for (int i = 0; i < N * num_iters; ++i) {
|
||||||
|
auto vec_id = i / DefaultElementPerChunk;
|
||||||
|
auto offset = i % DefaultElementPerChunk;
|
||||||
|
auto ans = final[vec_id][offset];
|
||||||
|
|
||||||
|
auto val = age_col[i];
|
||||||
|
auto ref = ref_func(val);
|
||||||
ASSERT_EQ(ans, ref) << clause << "@" << i << "!!" << val;
|
ASSERT_EQ(ans, ref) << clause << "@" << i << "!!" << val;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -31,6 +31,14 @@ struct GeneratedData {
|
|||||||
memcpy(ret.data(), target.data(), target.size());
|
memcpy(ret.data(), target.data(), target.size());
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
template <typename T>
|
||||||
|
auto
|
||||||
|
get_mutable_col(int index) {
|
||||||
|
auto& target = cols_.at(index);
|
||||||
|
assert(target.size() == row_ids_.size() * sizeof(T));
|
||||||
|
auto ptr = reinterpret_cast<T*>(target.data());
|
||||||
|
return ptr;
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
GeneratedData() = default;
|
GeneratedData() = default;
|
||||||
@ -58,6 +66,9 @@ GeneratedData::generate_rows(int N, SchemaPtr schema) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
rows_ = std::move(result);
|
rows_ = std::move(result);
|
||||||
|
raw_.raw_data = rows_.data();
|
||||||
|
raw_.sizeof_per_row = schema->get_total_sizeof();
|
||||||
|
raw_.count = N;
|
||||||
}
|
}
|
||||||
|
|
||||||
inline GeneratedData
|
inline GeneratedData
|
||||||
@ -129,14 +140,12 @@ DataGen(SchemaPtr schema, int64_t N, uint64_t seed = 42) {
|
|||||||
}
|
}
|
||||||
GeneratedData res;
|
GeneratedData res;
|
||||||
res.cols_ = std::move(cols);
|
res.cols_ = std::move(cols);
|
||||||
res.generate_rows(N, schema);
|
|
||||||
for (int i = 0; i < N; ++i) {
|
for (int i = 0; i < N; ++i) {
|
||||||
res.row_ids_.push_back(i);
|
res.row_ids_.push_back(i);
|
||||||
res.timestamps_.push_back(i);
|
res.timestamps_.push_back(i);
|
||||||
}
|
}
|
||||||
res.raw_.raw_data = res.rows_.data();
|
|
||||||
res.raw_.sizeof_per_row = schema->get_total_sizeof();
|
res.generate_rows(N, schema);
|
||||||
res.raw_.count = N;
|
|
||||||
return std::move(res);
|
return std::move(res);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
99
internal/storage/binlog_reader.go
Normal file
99
internal/storage/binlog_reader.go
Normal file
@ -0,0 +1,99 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/binary"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"github.com/zilliztech/milvus-distributed/internal/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type BinlogReader struct {
|
||||||
|
magicNumber int32
|
||||||
|
descriptorEvent
|
||||||
|
currentEventReader *EventReader
|
||||||
|
buffer *bytes.Buffer
|
||||||
|
bufferLength int
|
||||||
|
currentOffset int32
|
||||||
|
isClose bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (reader *BinlogReader) NextEventReader() (*EventReader, error) {
|
||||||
|
if reader.isClose {
|
||||||
|
return nil, errors.New("bin log reader is closed")
|
||||||
|
}
|
||||||
|
if reader.currentEventReader != nil {
|
||||||
|
reader.currentOffset = reader.currentEventReader.NextPosition
|
||||||
|
if err := reader.currentEventReader.Close(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
reader.currentEventReader = nil
|
||||||
|
if reader.currentOffset >= int32(reader.buffer.Len()) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
// skip remaining bytes of this event
|
||||||
|
remaining := int(reader.currentOffset) - (reader.bufferLength - reader.buffer.Len())
|
||||||
|
reader.buffer.Next(remaining)
|
||||||
|
}
|
||||||
|
if reader.currentOffset >= int32(reader.buffer.Len()) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
eventReader, err := newEventReader(reader.descriptorEvent.PayloadDataType, reader.buffer)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
reader.currentEventReader = eventReader
|
||||||
|
return reader.currentEventReader, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (reader *BinlogReader) readMagicNumber() (int32, error) {
|
||||||
|
if err := binary.Read(reader.buffer, binary.LittleEndian, &reader.magicNumber); err != nil {
|
||||||
|
return -1, err
|
||||||
|
}
|
||||||
|
reader.currentOffset = 4
|
||||||
|
if reader.magicNumber != MagicNumber {
|
||||||
|
return -1, errors.New("parse magic number failed, expected: " + strconv.Itoa(MagicNumber) +
|
||||||
|
", actual: " + strconv.Itoa(int(reader.magicNumber)))
|
||||||
|
}
|
||||||
|
|
||||||
|
return reader.magicNumber, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (reader *BinlogReader) readDescriptorEvent() (*descriptorEvent, error) {
|
||||||
|
event, err := ReadDescriptorEvent(reader.buffer)
|
||||||
|
reader.currentOffset = event.NextPosition
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
reader.descriptorEvent = *event
|
||||||
|
return &reader.descriptorEvent, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (reader *BinlogReader) Close() error {
|
||||||
|
if reader.isClose {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
reader.isClose = true
|
||||||
|
if reader.currentEventReader != nil {
|
||||||
|
if err := reader.currentEventReader.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBinlogReader(data []byte) (*BinlogReader, error) {
|
||||||
|
reader := &BinlogReader{
|
||||||
|
buffer: bytes.NewBuffer(data),
|
||||||
|
bufferLength: len(data),
|
||||||
|
isClose: false,
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := reader.readMagicNumber(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if _, err := reader.readDescriptorEvent(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return reader, nil
|
||||||
|
}
|
||||||
299
internal/storage/binlog_writer.go
Normal file
299
internal/storage/binlog_writer.go
Normal file
@ -0,0 +1,299 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/binary"
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// todo : put to param table
|
||||||
|
ServerID = 1
|
||||||
|
BinlogVersion = 1
|
||||||
|
CommitID = 1
|
||||||
|
ServerVersion = 1
|
||||||
|
HeaderLength = 17
|
||||||
|
)
|
||||||
|
|
||||||
|
type BinlogType int32
|
||||||
|
|
||||||
|
const (
|
||||||
|
InsertBinlog BinlogType = iota
|
||||||
|
DeleteBinlog
|
||||||
|
DDLBinlog
|
||||||
|
)
|
||||||
|
const (
|
||||||
|
MagicNumber = 0xfffabc
|
||||||
|
)
|
||||||
|
|
||||||
|
type baseBinlogWriter struct {
|
||||||
|
descriptorEvent
|
||||||
|
magicNumber int32
|
||||||
|
binlogType BinlogType
|
||||||
|
eventWriters []EventWriter
|
||||||
|
currentEventWriter EventWriter
|
||||||
|
buffer *bytes.Buffer
|
||||||
|
numEvents int32
|
||||||
|
numRows int32
|
||||||
|
isClose bool
|
||||||
|
offset int32
|
||||||
|
}
|
||||||
|
|
||||||
|
func (writer *baseBinlogWriter) checkClose() error {
|
||||||
|
if writer.isClose {
|
||||||
|
return errors.New("insert binlog writer is already closed")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (writer *baseBinlogWriter) appendEventWriter() error {
|
||||||
|
if writer.currentEventWriter != nil {
|
||||||
|
if err := writer.currentEventWriter.Finish(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
writer.eventWriters = append(writer.eventWriters, writer.currentEventWriter)
|
||||||
|
length, err := writer.currentEventWriter.GetMemoryUsageInBytes()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
writer.offset += length
|
||||||
|
writer.numEvents++
|
||||||
|
nums, err := writer.currentEventWriter.GetPayloadLengthFromWriter()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
writer.numRows += int32(nums)
|
||||||
|
writer.currentEventWriter = nil
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (writer *baseBinlogWriter) GetEventNums() int32 {
|
||||||
|
return writer.numEvents
|
||||||
|
}
|
||||||
|
|
||||||
|
func (writer *baseBinlogWriter) GetRowNums() (int32, error) {
|
||||||
|
var res = writer.numRows
|
||||||
|
if writer.currentEventWriter != nil {
|
||||||
|
nums, err := writer.currentEventWriter.GetPayloadLengthFromWriter()
|
||||||
|
if err != nil {
|
||||||
|
}
|
||||||
|
res += int32(nums)
|
||||||
|
}
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (writer *baseBinlogWriter) GetBinlogType() BinlogType {
|
||||||
|
return writer.binlogType
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetBuffer get binlog buffer. Return nil if binlog is not finished yet.
|
||||||
|
func (writer *baseBinlogWriter) GetBuffer() []byte {
|
||||||
|
if writer.buffer != nil {
|
||||||
|
return writer.buffer.Bytes()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close allocate buffer and release resource
|
||||||
|
func (writer *baseBinlogWriter) Close() error {
|
||||||
|
if writer.isClose {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
writer.isClose = true
|
||||||
|
if err := writer.appendEventWriter(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
writer.buffer = new(bytes.Buffer)
|
||||||
|
if err := binary.Write(writer.buffer, binary.LittleEndian, int32(MagicNumber)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := writer.descriptorEvent.Write(writer.buffer); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, w := range writer.eventWriters {
|
||||||
|
if err := w.Write(writer.buffer); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// close all writers
|
||||||
|
for _, e := range writer.eventWriters {
|
||||||
|
if err := e.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type InsertBinlogWriter struct {
|
||||||
|
baseBinlogWriter
|
||||||
|
}
|
||||||
|
|
||||||
|
func (writer *InsertBinlogWriter) NextInsertEventWriter() (*insertEventWriter, error) {
|
||||||
|
if err := writer.checkClose(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := writer.appendEventWriter(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
event, err := newInsertEventWriter(writer.PayloadDataType, writer.offset)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
writer.currentEventWriter = event
|
||||||
|
|
||||||
|
return event, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type DeleteBinlogWriter struct {
|
||||||
|
baseBinlogWriter
|
||||||
|
}
|
||||||
|
|
||||||
|
func (writer *DeleteBinlogWriter) NextDeleteEventWriter() (*deleteEventWriter, error) {
|
||||||
|
if err := writer.checkClose(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := writer.appendEventWriter(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
event, err := newDeleteEventWriter(writer.PayloadDataType, writer.offset)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
writer.currentEventWriter = event
|
||||||
|
|
||||||
|
return event, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type DDLBinlogWriter struct {
|
||||||
|
baseBinlogWriter
|
||||||
|
}
|
||||||
|
|
||||||
|
func (writer *DDLBinlogWriter) NextCreateCollectionEventWriter() (*createCollectionEventWriter, error) {
|
||||||
|
if err := writer.checkClose(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := writer.appendEventWriter(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
event, err := newCreateCollectionEventWriter(writer.PayloadDataType, writer.offset)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
writer.currentEventWriter = event
|
||||||
|
|
||||||
|
return event, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (writer *DDLBinlogWriter) NextDropCollectionEventWriter() (*dropCollectionEventWriter, error) {
|
||||||
|
if err := writer.checkClose(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := writer.appendEventWriter(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
event, err := newDropCollectionEventWriter(writer.PayloadDataType, writer.offset)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
writer.currentEventWriter = event
|
||||||
|
|
||||||
|
return event, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (writer *DDLBinlogWriter) NextCreatePartitionEventWriter() (*createPartitionEventWriter, error) {
|
||||||
|
if err := writer.checkClose(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := writer.appendEventWriter(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
event, err := newCreatePartitionEventWriter(writer.PayloadDataType, writer.offset)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
writer.currentEventWriter = event
|
||||||
|
|
||||||
|
return event, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (writer *DDLBinlogWriter) NextDropPartitionEventWriter() (*dropPartitionEventWriter, error) {
|
||||||
|
if err := writer.checkClose(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := writer.appendEventWriter(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
event, err := newDropPartitionEventWriter(writer.PayloadDataType, writer.offset)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
writer.currentEventWriter = event
|
||||||
|
|
||||||
|
return event, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewInsertBinlogWriter(dataType schemapb.DataType) *InsertBinlogWriter {
|
||||||
|
descriptorEvent := newDescriptorEvent()
|
||||||
|
descriptorEvent.PayloadDataType = dataType
|
||||||
|
return &InsertBinlogWriter{
|
||||||
|
baseBinlogWriter: baseBinlogWriter{
|
||||||
|
descriptorEvent: descriptorEvent,
|
||||||
|
magicNumber: MagicNumber,
|
||||||
|
binlogType: InsertBinlog,
|
||||||
|
eventWriters: make([]EventWriter, 0),
|
||||||
|
currentEventWriter: nil,
|
||||||
|
buffer: nil,
|
||||||
|
numEvents: 0,
|
||||||
|
numRows: 0,
|
||||||
|
isClose: false,
|
||||||
|
offset: 4 + descriptorEvent.GetMemoryUsageInBytes(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func NewDeleteBinlogWriter(dataType schemapb.DataType) *DeleteBinlogWriter {
|
||||||
|
descriptorEvent := newDescriptorEvent()
|
||||||
|
descriptorEvent.PayloadDataType = dataType
|
||||||
|
return &DeleteBinlogWriter{
|
||||||
|
baseBinlogWriter: baseBinlogWriter{
|
||||||
|
descriptorEvent: descriptorEvent,
|
||||||
|
magicNumber: MagicNumber,
|
||||||
|
binlogType: DeleteBinlog,
|
||||||
|
eventWriters: make([]EventWriter, 0),
|
||||||
|
currentEventWriter: nil,
|
||||||
|
buffer: nil,
|
||||||
|
numEvents: 0,
|
||||||
|
numRows: 0,
|
||||||
|
isClose: false,
|
||||||
|
offset: 4 + descriptorEvent.GetMemoryUsageInBytes(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func NewDDLBinlogWriter(dataType schemapb.DataType) *DDLBinlogWriter {
|
||||||
|
descriptorEvent := newDescriptorEvent()
|
||||||
|
descriptorEvent.PayloadDataType = dataType
|
||||||
|
return &DDLBinlogWriter{
|
||||||
|
baseBinlogWriter: baseBinlogWriter{
|
||||||
|
descriptorEvent: descriptorEvent,
|
||||||
|
magicNumber: MagicNumber,
|
||||||
|
binlogType: DDLBinlog,
|
||||||
|
eventWriters: make([]EventWriter, 0),
|
||||||
|
currentEventWriter: nil,
|
||||||
|
buffer: nil,
|
||||||
|
numEvents: 0,
|
||||||
|
numRows: 0,
|
||||||
|
isClose: false,
|
||||||
|
offset: 4 + descriptorEvent.GetMemoryUsageInBytes(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
52
internal/storage/binlog_writer_test.go
Normal file
52
internal/storage/binlog_writer_test.go
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestBinlogWriterReader(t *testing.T) {
|
||||||
|
binlogWriter := NewInsertBinlogWriter(schemapb.DataType_INT32)
|
||||||
|
defer binlogWriter.Close()
|
||||||
|
eventWriter, err := binlogWriter.NextInsertEventWriter()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
err = eventWriter.AddInt32ToPayload([]int32{1, 2, 3})
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Nil(t, nil, binlogWriter.GetBuffer())
|
||||||
|
err = binlogWriter.Close()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.EqualValues(t, 1, binlogWriter.GetEventNums())
|
||||||
|
nums, err := binlogWriter.GetRowNums()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.EqualValues(t, 3, nums)
|
||||||
|
err = eventWriter.AddInt32ToPayload([]int32{1, 2, 3})
|
||||||
|
assert.NotNil(t, err)
|
||||||
|
nums, err = binlogWriter.GetRowNums()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.EqualValues(t, 3, nums)
|
||||||
|
|
||||||
|
buffer := binlogWriter.GetBuffer()
|
||||||
|
fmt.Println("reader offset : " + strconv.Itoa(len(buffer)))
|
||||||
|
|
||||||
|
binlogReader, err := NewBinlogReader(buffer)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
eventReader, err := binlogReader.NextEventReader()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
_, err = eventReader.GetInt8FromPayload()
|
||||||
|
assert.NotNil(t, err)
|
||||||
|
payload, err := eventReader.GetInt32FromPayload()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.EqualValues(t, 3, len(payload))
|
||||||
|
assert.EqualValues(t, 1, payload[0])
|
||||||
|
assert.EqualValues(t, 2, payload[1])
|
||||||
|
assert.EqualValues(t, 3, payload[2])
|
||||||
|
|
||||||
|
reader, err := binlogReader.NextEventReader()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
fmt.Println("reader offset : " + strconv.Itoa(int(binlogReader.currentOffset)))
|
||||||
|
assert.Nil(t, reader)
|
||||||
|
}
|
||||||
347
internal/storage/event_data.go
Normal file
347
internal/storage/event_data.go
Normal file
@ -0,0 +1,347 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/binary"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
|
||||||
|
|
||||||
|
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
type descriptorEventData struct {
|
||||||
|
DescriptorEventDataFixPart
|
||||||
|
PostHeaderLengths []uint8
|
||||||
|
}
|
||||||
|
|
||||||
|
type DescriptorEventDataFixPart struct {
|
||||||
|
BinlogVersion int16
|
||||||
|
ServerVersion int64
|
||||||
|
CommitID int64
|
||||||
|
HeaderLength int8
|
||||||
|
CollectionID int64
|
||||||
|
PartitionID int64
|
||||||
|
SegmentID int64
|
||||||
|
StartTimestamp typeutil.Timestamp
|
||||||
|
EndTimestamp typeutil.Timestamp
|
||||||
|
PayloadDataType schemapb.DataType
|
||||||
|
}
|
||||||
|
|
||||||
|
func (data *descriptorEventData) SetStartTimeStamp(ts typeutil.Timestamp) {
|
||||||
|
data.StartTimestamp = ts
|
||||||
|
}
|
||||||
|
|
||||||
|
func (data *descriptorEventData) SetEndTimeStamp(ts typeutil.Timestamp) {
|
||||||
|
data.EndTimestamp = ts
|
||||||
|
}
|
||||||
|
|
||||||
|
func (data *descriptorEventData) GetMemoryUsageInBytes() int32 {
|
||||||
|
buf := new(bytes.Buffer)
|
||||||
|
_ = data.Write(buf)
|
||||||
|
return int32(buf.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (data *descriptorEventData) Write(buffer io.Writer) error {
|
||||||
|
if err := binary.Write(buffer, binary.LittleEndian, data.DescriptorEventDataFixPart); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := binary.Write(buffer, binary.LittleEndian, data.PostHeaderLengths); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func readDescriptorEventData(buffer io.Reader) (*descriptorEventData, error) {
|
||||||
|
event := newDescriptorEventData()
|
||||||
|
|
||||||
|
if err := binary.Read(buffer, binary.LittleEndian, &event.DescriptorEventDataFixPart); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := binary.Read(buffer, binary.LittleEndian, &event.PostHeaderLengths); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &event, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type eventData interface {
|
||||||
|
GetEventDataSize() int32
|
||||||
|
WriteEventData(buffer io.Writer) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// all event types' fixed part only have start Timestamp and end Timestamp yet, but maybe different events will
|
||||||
|
// have different fields later, so we just create a event data struct per event type.
|
||||||
|
type insertEventData struct {
|
||||||
|
StartTimestamp typeutil.Timestamp
|
||||||
|
EndTimestamp typeutil.Timestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
func (data *insertEventData) SetStartTimestamp(timestamp typeutil.Timestamp) {
|
||||||
|
data.StartTimestamp = timestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
func (data *insertEventData) SetEndTimestamp(timestamp typeutil.Timestamp) {
|
||||||
|
data.EndTimestamp = timestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
func (data *insertEventData) GetEventDataSize() int32 {
|
||||||
|
buf := new(bytes.Buffer)
|
||||||
|
_ = binary.Write(buf, binary.LittleEndian, data)
|
||||||
|
return int32(buf.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (data *insertEventData) WriteEventData(buffer io.Writer) error {
|
||||||
|
return binary.Write(buffer, binary.LittleEndian, data)
|
||||||
|
}
|
||||||
|
|
||||||
|
type deleteEventData struct {
|
||||||
|
StartTimestamp typeutil.Timestamp
|
||||||
|
EndTimestamp typeutil.Timestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
func (data *deleteEventData) SetStartTimestamp(timestamp typeutil.Timestamp) {
|
||||||
|
data.StartTimestamp = timestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
func (data *deleteEventData) SetEndTimestamp(timestamp typeutil.Timestamp) {
|
||||||
|
data.EndTimestamp = timestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
func (data *deleteEventData) GetEventDataSize() int32 {
|
||||||
|
buf := new(bytes.Buffer)
|
||||||
|
_ = binary.Write(buf, binary.LittleEndian, data)
|
||||||
|
return int32(buf.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (data *deleteEventData) WriteEventData(buffer io.Writer) error {
|
||||||
|
if err := binary.Write(buffer, binary.LittleEndian, data.StartTimestamp); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := binary.Write(buffer, binary.LittleEndian, data.EndTimestamp); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type createCollectionEventData struct {
|
||||||
|
StartTimestamp typeutil.Timestamp
|
||||||
|
EndTimestamp typeutil.Timestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
func (data *createCollectionEventData) SetStartTimestamp(timestamp typeutil.Timestamp) {
|
||||||
|
data.StartTimestamp = timestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
func (data *createCollectionEventData) SetEndTimestamp(timestamp typeutil.Timestamp) {
|
||||||
|
data.EndTimestamp = timestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
func (data *createCollectionEventData) GetEventDataSize() int32 {
|
||||||
|
buf := new(bytes.Buffer)
|
||||||
|
_ = binary.Write(buf, binary.LittleEndian, data)
|
||||||
|
return int32(buf.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (data *createCollectionEventData) WriteEventData(buffer io.Writer) error {
|
||||||
|
if err := binary.Write(buffer, binary.LittleEndian, data.StartTimestamp); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := binary.Write(buffer, binary.LittleEndian, data.EndTimestamp); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type dropCollectionEventData struct {
|
||||||
|
StartTimestamp typeutil.Timestamp
|
||||||
|
EndTimestamp typeutil.Timestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
func (data *dropCollectionEventData) SetStartTimestamp(timestamp typeutil.Timestamp) {
|
||||||
|
data.StartTimestamp = timestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
func (data *dropCollectionEventData) SetEndTimestamp(timestamp typeutil.Timestamp) {
|
||||||
|
data.EndTimestamp = timestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
func (data *dropCollectionEventData) GetEventDataSize() int32 {
|
||||||
|
buf := new(bytes.Buffer)
|
||||||
|
_ = binary.Write(buf, binary.LittleEndian, data)
|
||||||
|
return int32(buf.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (data *dropCollectionEventData) WriteEventData(buffer io.Writer) error {
|
||||||
|
if err := binary.Write(buffer, binary.LittleEndian, data.StartTimestamp); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := binary.Write(buffer, binary.LittleEndian, data.EndTimestamp); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type createPartitionEventData struct {
|
||||||
|
StartTimestamp typeutil.Timestamp
|
||||||
|
EndTimestamp typeutil.Timestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
func (data *createPartitionEventData) SetStartTimestamp(timestamp typeutil.Timestamp) {
|
||||||
|
data.StartTimestamp = timestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
func (data *createPartitionEventData) SetEndTimestamp(timestamp typeutil.Timestamp) {
|
||||||
|
data.EndTimestamp = timestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
func (data *createPartitionEventData) GetEventDataSize() int32 {
|
||||||
|
buf := new(bytes.Buffer)
|
||||||
|
_ = binary.Write(buf, binary.LittleEndian, data)
|
||||||
|
return int32(buf.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (data *createPartitionEventData) WriteEventData(buffer io.Writer) error {
|
||||||
|
if err := binary.Write(buffer, binary.LittleEndian, data.StartTimestamp); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := binary.Write(buffer, binary.LittleEndian, data.EndTimestamp); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type dropPartitionEventData struct {
|
||||||
|
StartTimestamp typeutil.Timestamp
|
||||||
|
EndTimestamp typeutil.Timestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
func (data *dropPartitionEventData) SetStartTimestamp(timestamp typeutil.Timestamp) {
|
||||||
|
data.StartTimestamp = timestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
func (data *dropPartitionEventData) SetEndTimestamp(timestamp typeutil.Timestamp) {
|
||||||
|
data.EndTimestamp = timestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
func (data *dropPartitionEventData) GetEventDataSize() int32 {
|
||||||
|
buf := new(bytes.Buffer)
|
||||||
|
_ = binary.Write(buf, binary.LittleEndian, data)
|
||||||
|
return int32(buf.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (data *dropPartitionEventData) WriteEventData(buffer io.Writer) error {
|
||||||
|
if err := binary.Write(buffer, binary.LittleEndian, data.StartTimestamp); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := binary.Write(buffer, binary.LittleEndian, data.EndTimestamp); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newDescriptorEventData() descriptorEventData {
|
||||||
|
data := descriptorEventData{
|
||||||
|
DescriptorEventDataFixPart: DescriptorEventDataFixPart{
|
||||||
|
BinlogVersion: BinlogVersion,
|
||||||
|
ServerVersion: ServerVersion,
|
||||||
|
CommitID: CommitID,
|
||||||
|
CollectionID: -1,
|
||||||
|
PartitionID: -1,
|
||||||
|
SegmentID: -1,
|
||||||
|
StartTimestamp: 0,
|
||||||
|
EndTimestamp: 0,
|
||||||
|
PayloadDataType: -1,
|
||||||
|
},
|
||||||
|
PostHeaderLengths: []uint8{16, 16, 16, 16, 16, 16},
|
||||||
|
}
|
||||||
|
data.HeaderLength = int8(data.GetMemoryUsageInBytes())
|
||||||
|
return data
|
||||||
|
}
|
||||||
|
|
||||||
|
func newInsertEventData() insertEventData {
|
||||||
|
return insertEventData{
|
||||||
|
StartTimestamp: 0,
|
||||||
|
EndTimestamp: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func newDeleteEventData() deleteEventData {
|
||||||
|
return deleteEventData{
|
||||||
|
StartTimestamp: 0,
|
||||||
|
EndTimestamp: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func newCreateCollectionEventData() createCollectionEventData {
|
||||||
|
return createCollectionEventData{
|
||||||
|
StartTimestamp: 0,
|
||||||
|
EndTimestamp: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func newDropCollectionEventData() dropCollectionEventData {
|
||||||
|
return dropCollectionEventData{
|
||||||
|
StartTimestamp: 0,
|
||||||
|
EndTimestamp: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func newCreatePartitionEventData() createPartitionEventData {
|
||||||
|
return createPartitionEventData{
|
||||||
|
StartTimestamp: 0,
|
||||||
|
EndTimestamp: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func newDropPartitionEventData() dropPartitionEventData {
|
||||||
|
return dropPartitionEventData{
|
||||||
|
StartTimestamp: 0,
|
||||||
|
EndTimestamp: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func readInsertEventData(buffer io.Reader) (*insertEventData, error) {
|
||||||
|
data := &insertEventData{}
|
||||||
|
if err := binary.Read(buffer, binary.LittleEndian, data); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return data, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func readDeleteEventData(buffer io.Reader) (*deleteEventData, error) {
|
||||||
|
data := &deleteEventData{}
|
||||||
|
if err := binary.Read(buffer, binary.LittleEndian, data); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return data, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func readCreateCollectionEventData(buffer io.Reader) (*createCollectionEventData, error) {
|
||||||
|
data := &createCollectionEventData{}
|
||||||
|
if err := binary.Read(buffer, binary.LittleEndian, data); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return data, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func readDropCollectionEventData(buffer io.Reader) (*dropCollectionEventData, error) {
|
||||||
|
data := &dropCollectionEventData{}
|
||||||
|
if err := binary.Read(buffer, binary.LittleEndian, data); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return data, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func readCreatePartitionEventData(buffer io.Reader) (*createPartitionEventData, error) {
|
||||||
|
data := &createPartitionEventData{}
|
||||||
|
if err := binary.Read(buffer, binary.LittleEndian, data); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return data, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func readDropPartitionEventData(buffer io.Reader) (*dropPartitionEventData, error) {
|
||||||
|
data := &dropPartitionEventData{}
|
||||||
|
if err := binary.Read(buffer, binary.LittleEndian, data); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return data, nil
|
||||||
|
}
|
||||||
76
internal/storage/event_header.go
Normal file
76
internal/storage/event_header.go
Normal file
@ -0,0 +1,76 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/binary"
|
||||||
|
"io"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
|
||||||
|
|
||||||
|
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
type baseEventHeader struct {
|
||||||
|
Timestamp typeutil.Timestamp
|
||||||
|
TypeCode EventTypeCode
|
||||||
|
ServerID int32
|
||||||
|
EventLength int32
|
||||||
|
NextPosition int32
|
||||||
|
}
|
||||||
|
|
||||||
|
func (header *baseEventHeader) GetMemoryUsageInBytes() int32 {
|
||||||
|
buf := new(bytes.Buffer)
|
||||||
|
binary.Write(buf, binary.LittleEndian, header)
|
||||||
|
return int32(buf.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (header *baseEventHeader) Write(buffer io.Writer) error {
|
||||||
|
return binary.Write(buffer, binary.LittleEndian, header)
|
||||||
|
}
|
||||||
|
|
||||||
|
type descriptorEventHeader = baseEventHeader
|
||||||
|
|
||||||
|
type eventHeader struct {
|
||||||
|
baseEventHeader
|
||||||
|
}
|
||||||
|
|
||||||
|
func readEventHeader(buffer io.Reader) (*eventHeader, error) {
|
||||||
|
header := &eventHeader{}
|
||||||
|
if err := binary.Read(buffer, binary.LittleEndian, header); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return header, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func readDescriptorEventHeader(buffer io.Reader) (*descriptorEventHeader, error) {
|
||||||
|
header := &descriptorEventHeader{}
|
||||||
|
if err := binary.Read(buffer, binary.LittleEndian, header); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return header, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newDescriptorEventHeader() descriptorEventHeader {
|
||||||
|
header := descriptorEventHeader{
|
||||||
|
Timestamp: tsoutil.ComposeTS(time.Now().UnixNano()/int64(time.Millisecond), 0),
|
||||||
|
TypeCode: DescriptorEventType,
|
||||||
|
ServerID: ServerID,
|
||||||
|
}
|
||||||
|
header.EventLength = header.GetMemoryUsageInBytes()
|
||||||
|
header.NextPosition = header.EventLength + 4
|
||||||
|
return header
|
||||||
|
}
|
||||||
|
|
||||||
|
func newEventHeader(eventTypeCode EventTypeCode) eventHeader {
|
||||||
|
return eventHeader{
|
||||||
|
baseEventHeader: baseEventHeader{
|
||||||
|
Timestamp: tsoutil.ComposeTS(time.Now().UnixNano()/int64(time.Millisecond), 0),
|
||||||
|
TypeCode: eventTypeCode,
|
||||||
|
ServerID: ServerID,
|
||||||
|
EventLength: -1,
|
||||||
|
NextPosition: -1,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
99
internal/storage/event_reader.go
Normal file
99
internal/storage/event_reader.go
Normal file
@ -0,0 +1,99 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"errors"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
|
||||||
|
)
|
||||||
|
|
||||||
|
type EventReader struct {
|
||||||
|
eventHeader
|
||||||
|
eventData
|
||||||
|
PayloadReaderInterface
|
||||||
|
buffer *bytes.Buffer
|
||||||
|
isClosed bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (reader *EventReader) checkClose() error {
|
||||||
|
if reader.isClosed {
|
||||||
|
return errors.New("event reader is closed")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (reader *EventReader) readHeader() (*eventHeader, error) {
|
||||||
|
if err := reader.checkClose(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
header, err := readEventHeader(reader.buffer)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
reader.eventHeader = *header
|
||||||
|
return &reader.eventHeader, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (reader *EventReader) readData() (eventData, error) {
|
||||||
|
if err := reader.checkClose(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var data eventData
|
||||||
|
var err error
|
||||||
|
switch reader.TypeCode {
|
||||||
|
case InsertEventType:
|
||||||
|
data, err = readInsertEventData(reader.buffer)
|
||||||
|
case DeleteEventType:
|
||||||
|
data, err = readDeleteEventData(reader.buffer)
|
||||||
|
case CreateCollectionEventType:
|
||||||
|
data, err = readCreateCollectionEventData(reader.buffer)
|
||||||
|
case DropCollectionEventType:
|
||||||
|
data, err = readDropCollectionEventData(reader.buffer)
|
||||||
|
case CreatePartitionEventType:
|
||||||
|
data, err = readCreatePartitionEventData(reader.buffer)
|
||||||
|
case DropPartitionEventType:
|
||||||
|
data, err = readDropPartitionEventData(reader.buffer)
|
||||||
|
default:
|
||||||
|
return nil, errors.New("unknown header type code: " + strconv.Itoa(int(reader.TypeCode)))
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
reader.eventData = data
|
||||||
|
return reader.eventData, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (reader *EventReader) Close() error {
|
||||||
|
if !reader.isClosed {
|
||||||
|
reader.isClosed = true
|
||||||
|
return reader.PayloadReaderInterface.Close()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func newEventReader(datatype schemapb.DataType, buffer *bytes.Buffer) (*EventReader, error) {
|
||||||
|
reader := &EventReader{
|
||||||
|
eventHeader: eventHeader{
|
||||||
|
baseEventHeader{},
|
||||||
|
},
|
||||||
|
buffer: buffer,
|
||||||
|
isClosed: false,
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := reader.readHeader(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := reader.readData(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
payloadReader, err := NewPayloadReader(datatype, buffer.Bytes())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
reader.PayloadReaderInterface = payloadReader
|
||||||
|
return reader, nil
|
||||||
|
}
|
||||||
293
internal/storage/event_writer.go
Normal file
293
internal/storage/event_writer.go
Normal file
@ -0,0 +1,293 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/binary"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
|
||||||
|
)
|
||||||
|
|
||||||
|
type EventTypeCode int8
|
||||||
|
|
||||||
|
const (
|
||||||
|
DescriptorEventType EventTypeCode = iota
|
||||||
|
InsertEventType
|
||||||
|
DeleteEventType
|
||||||
|
CreateCollectionEventType
|
||||||
|
DropCollectionEventType
|
||||||
|
CreatePartitionEventType
|
||||||
|
DropPartitionEventType
|
||||||
|
)
|
||||||
|
|
||||||
|
func (code EventTypeCode) String() string {
|
||||||
|
codes := []string{"DescriptorEventType", "InsertEventType", "DeleteEventType", "CreateCollectionEventType", "DropCollectionEventType",
|
||||||
|
"CreatePartitionEventType", "DropPartitionEventType"}
|
||||||
|
if len(codes) < int(code) {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return codes[code]
|
||||||
|
}
|
||||||
|
|
||||||
|
type descriptorEvent struct {
|
||||||
|
descriptorEventHeader
|
||||||
|
descriptorEventData
|
||||||
|
}
|
||||||
|
|
||||||
|
func (event *descriptorEvent) GetMemoryUsageInBytes() int32 {
|
||||||
|
return event.descriptorEventHeader.GetMemoryUsageInBytes() + event.descriptorEventData.GetMemoryUsageInBytes()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (event *descriptorEvent) Write(buffer io.Writer) error {
|
||||||
|
if err := event.descriptorEventHeader.Write(buffer); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := event.descriptorEventData.Write(buffer); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func ReadDescriptorEvent(buffer io.Reader) (*descriptorEvent, error) {
|
||||||
|
header, err := readDescriptorEventHeader(buffer)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
data, err := readDescriptorEventData(buffer)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &descriptorEvent{
|
||||||
|
descriptorEventHeader: *header,
|
||||||
|
descriptorEventData: *data,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type EventWriter interface {
|
||||||
|
PayloadWriterInterface
|
||||||
|
// Finish set meta in header and no data can be added to event writer
|
||||||
|
Finish() error
|
||||||
|
// Close release resources
|
||||||
|
Close() error
|
||||||
|
// Write serialize to buffer, should call Finish first
|
||||||
|
Write(buffer *bytes.Buffer) error
|
||||||
|
GetMemoryUsageInBytes() (int32, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type baseEventWriter struct {
|
||||||
|
eventHeader
|
||||||
|
PayloadWriterInterface
|
||||||
|
isClosed bool
|
||||||
|
isFinish bool
|
||||||
|
offset int32
|
||||||
|
getEventDataSize func() int32
|
||||||
|
writeEventData func(buffer io.Writer) error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (writer *baseEventWriter) GetMemoryUsageInBytes() (int32, error) {
|
||||||
|
data, err := writer.GetPayloadBufferFromWriter()
|
||||||
|
if err != nil {
|
||||||
|
return -1, err
|
||||||
|
}
|
||||||
|
return writer.getEventDataSize() + writer.eventHeader.GetMemoryUsageInBytes() +
|
||||||
|
int32(len(data)), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (writer *baseEventWriter) Write(buffer *bytes.Buffer) error {
|
||||||
|
if err := writer.eventHeader.Write(buffer); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := writer.writeEventData(buffer); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
data, err := writer.GetPayloadBufferFromWriter()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := binary.Write(buffer, binary.LittleEndian, data); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (writer *baseEventWriter) Finish() error {
|
||||||
|
if !writer.isFinish {
|
||||||
|
writer.isFinish = true
|
||||||
|
if err := writer.FinishPayloadWriter(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
eventLength, err := writer.GetMemoryUsageInBytes()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
writer.EventLength = eventLength
|
||||||
|
writer.NextPosition = eventLength + writer.offset
|
||||||
|
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (writer *baseEventWriter) Close() error {
|
||||||
|
if !writer.isClosed {
|
||||||
|
writer.isFinish = true
|
||||||
|
writer.isClosed = true
|
||||||
|
if err := writer.ReleasePayloadWriter(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type insertEventWriter struct {
|
||||||
|
baseEventWriter
|
||||||
|
insertEventData
|
||||||
|
}
|
||||||
|
|
||||||
|
type deleteEventWriter struct {
|
||||||
|
baseEventWriter
|
||||||
|
deleteEventData
|
||||||
|
}
|
||||||
|
|
||||||
|
type createCollectionEventWriter struct {
|
||||||
|
baseEventWriter
|
||||||
|
createCollectionEventData
|
||||||
|
}
|
||||||
|
|
||||||
|
type dropCollectionEventWriter struct {
|
||||||
|
baseEventWriter
|
||||||
|
dropCollectionEventData
|
||||||
|
}
|
||||||
|
|
||||||
|
type createPartitionEventWriter struct {
|
||||||
|
baseEventWriter
|
||||||
|
createPartitionEventData
|
||||||
|
}
|
||||||
|
|
||||||
|
type dropPartitionEventWriter struct {
|
||||||
|
baseEventWriter
|
||||||
|
dropPartitionEventData
|
||||||
|
}
|
||||||
|
|
||||||
|
func newDescriptorEvent() descriptorEvent {
|
||||||
|
return descriptorEvent{
|
||||||
|
descriptorEventHeader: newDescriptorEventHeader(),
|
||||||
|
descriptorEventData: newDescriptorEventData(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newInsertEventWriter(dataType schemapb.DataType, offset int32) (*insertEventWriter, error) {
|
||||||
|
payloadWriter, err := NewPayloadWriter(dataType)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
writer := &insertEventWriter{
|
||||||
|
baseEventWriter: baseEventWriter{
|
||||||
|
eventHeader: newEventHeader(InsertEventType),
|
||||||
|
PayloadWriterInterface: payloadWriter,
|
||||||
|
isClosed: false,
|
||||||
|
isFinish: false,
|
||||||
|
offset: offset,
|
||||||
|
},
|
||||||
|
insertEventData: newInsertEventData(),
|
||||||
|
}
|
||||||
|
writer.baseEventWriter.getEventDataSize = writer.insertEventData.GetEventDataSize
|
||||||
|
writer.baseEventWriter.writeEventData = writer.insertEventData.WriteEventData
|
||||||
|
return writer, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newDeleteEventWriter(dataType schemapb.DataType, offset int32) (*deleteEventWriter, error) {
|
||||||
|
payloadWriter, err := NewPayloadWriter(dataType)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
writer := &deleteEventWriter{
|
||||||
|
baseEventWriter: baseEventWriter{
|
||||||
|
eventHeader: newEventHeader(DeleteEventType),
|
||||||
|
PayloadWriterInterface: payloadWriter,
|
||||||
|
isClosed: false,
|
||||||
|
isFinish: false,
|
||||||
|
offset: offset,
|
||||||
|
},
|
||||||
|
deleteEventData: newDeleteEventData(),
|
||||||
|
}
|
||||||
|
writer.baseEventWriter.getEventDataSize = writer.deleteEventData.GetEventDataSize
|
||||||
|
writer.baseEventWriter.writeEventData = writer.deleteEventData.WriteEventData
|
||||||
|
return writer, nil
|
||||||
|
}
|
||||||
|
func newCreateCollectionEventWriter(dataType schemapb.DataType, offset int32) (*createCollectionEventWriter, error) {
|
||||||
|
payloadWriter, err := NewPayloadWriter(dataType)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
writer := &createCollectionEventWriter{
|
||||||
|
baseEventWriter: baseEventWriter{
|
||||||
|
eventHeader: newEventHeader(CreateCollectionEventType),
|
||||||
|
PayloadWriterInterface: payloadWriter,
|
||||||
|
isClosed: false,
|
||||||
|
isFinish: false,
|
||||||
|
offset: offset,
|
||||||
|
},
|
||||||
|
createCollectionEventData: newCreateCollectionEventData(),
|
||||||
|
}
|
||||||
|
writer.baseEventWriter.getEventDataSize = writer.createCollectionEventData.GetEventDataSize
|
||||||
|
writer.baseEventWriter.writeEventData = writer.createCollectionEventData.WriteEventData
|
||||||
|
return writer, nil
|
||||||
|
}
|
||||||
|
func newDropCollectionEventWriter(dataType schemapb.DataType, offset int32) (*dropCollectionEventWriter, error) {
|
||||||
|
payloadWriter, err := NewPayloadWriter(dataType)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
writer := &dropCollectionEventWriter{
|
||||||
|
baseEventWriter: baseEventWriter{
|
||||||
|
eventHeader: newEventHeader(DropCollectionEventType),
|
||||||
|
PayloadWriterInterface: payloadWriter,
|
||||||
|
isClosed: false,
|
||||||
|
isFinish: false,
|
||||||
|
offset: offset,
|
||||||
|
},
|
||||||
|
dropCollectionEventData: newDropCollectionEventData(),
|
||||||
|
}
|
||||||
|
writer.baseEventWriter.getEventDataSize = writer.dropCollectionEventData.GetEventDataSize
|
||||||
|
writer.baseEventWriter.writeEventData = writer.dropCollectionEventData.WriteEventData
|
||||||
|
return writer, nil
|
||||||
|
}
|
||||||
|
func newCreatePartitionEventWriter(dataType schemapb.DataType, offset int32) (*createPartitionEventWriter, error) {
|
||||||
|
payloadWriter, err := NewPayloadWriter(dataType)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
writer := &createPartitionEventWriter{
|
||||||
|
baseEventWriter: baseEventWriter{
|
||||||
|
eventHeader: newEventHeader(CreatePartitionEventType),
|
||||||
|
PayloadWriterInterface: payloadWriter,
|
||||||
|
isClosed: false,
|
||||||
|
isFinish: false,
|
||||||
|
offset: offset,
|
||||||
|
},
|
||||||
|
createPartitionEventData: newCreatePartitionEventData(),
|
||||||
|
}
|
||||||
|
writer.baseEventWriter.getEventDataSize = writer.createPartitionEventData.GetEventDataSize
|
||||||
|
writer.baseEventWriter.writeEventData = writer.createPartitionEventData.WriteEventData
|
||||||
|
return writer, nil
|
||||||
|
}
|
||||||
|
func newDropPartitionEventWriter(dataType schemapb.DataType, offset int32) (*dropPartitionEventWriter, error) {
|
||||||
|
payloadWriter, err := NewPayloadWriter(dataType)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
writer := &dropPartitionEventWriter{
|
||||||
|
baseEventWriter: baseEventWriter{
|
||||||
|
eventHeader: newEventHeader(DropPartitionEventType),
|
||||||
|
PayloadWriterInterface: payloadWriter,
|
||||||
|
isClosed: false,
|
||||||
|
isFinish: false,
|
||||||
|
offset: offset,
|
||||||
|
},
|
||||||
|
dropPartitionEventData: newDropPartitionEventData(),
|
||||||
|
}
|
||||||
|
writer.baseEventWriter.getEventDataSize = writer.dropPartitionEventData.GetEventDataSize
|
||||||
|
writer.baseEventWriter.writeEventData = writer.dropPartitionEventData.WriteEventData
|
||||||
|
return writer, nil
|
||||||
|
}
|
||||||
44
internal/storage/event_writer_test.go
Normal file
44
internal/storage/event_writer_test.go
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestEventWriter(t *testing.T) {
|
||||||
|
insertEvent, err := newInsertEventWriter(schemapb.DataType_INT32, 0)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
defer insertEvent.Close()
|
||||||
|
err = insertEvent.Close()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
insertEvent, err = newInsertEventWriter(schemapb.DataType_INT32, 0)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
err = insertEvent.AddInt64ToPayload([]int64{1, 1})
|
||||||
|
assert.NotNil(t, err)
|
||||||
|
err = insertEvent.AddInt32ToPayload([]int32{1, 2, 3})
|
||||||
|
assert.Nil(t, err)
|
||||||
|
nums, err := insertEvent.GetPayloadLengthFromWriter()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.EqualValues(t, 3, nums)
|
||||||
|
err = insertEvent.Finish()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
length, err := insertEvent.GetMemoryUsageInBytes()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.EqualValues(t, length, insertEvent.EventLength)
|
||||||
|
err = insertEvent.AddInt32ToPayload([]int32{1})
|
||||||
|
assert.NotNil(t, err)
|
||||||
|
buffer := new(bytes.Buffer)
|
||||||
|
err = insertEvent.Write(buffer)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
length, err = insertEvent.GetMemoryUsageInBytes()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.EqualValues(t, length, buffer.Len())
|
||||||
|
err = insertEvent.Close()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
}
|
||||||
@ -16,6 +16,41 @@ import (
|
|||||||
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
|
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type PayloadWriterInterface interface {
|
||||||
|
AddDataToPayload(msgs interface{}, dim ...int) error
|
||||||
|
AddBoolToPayload(msgs []bool) error
|
||||||
|
AddInt8ToPayload(msgs []int8) error
|
||||||
|
AddInt16ToPayload(msgs []int16) error
|
||||||
|
AddInt32ToPayload(msgs []int32) error
|
||||||
|
AddInt64ToPayload(msgs []int64) error
|
||||||
|
AddFloatToPayload(msgs []float32) error
|
||||||
|
AddDoubleToPayload(msgs []float64) error
|
||||||
|
AddOneStringToPayload(msgs string) error
|
||||||
|
AddBinaryVectorToPayload(binVec []byte, dim int) error
|
||||||
|
AddFloatVectorToPayload(binVec []float32, dim int) error
|
||||||
|
FinishPayloadWriter() error
|
||||||
|
GetPayloadBufferFromWriter() ([]byte, error)
|
||||||
|
GetPayloadLengthFromWriter() (int, error)
|
||||||
|
ReleasePayloadWriter() error
|
||||||
|
Close() error
|
||||||
|
}
|
||||||
|
|
||||||
|
type PayloadReaderInterface interface {
|
||||||
|
GetDataFromPayload(idx ...int) (interface{}, int, error)
|
||||||
|
GetBoolFromPayload() ([]bool, error)
|
||||||
|
GetInt8FromPayload() ([]int8, error)
|
||||||
|
GetInt16FromPayload() ([]int16, error)
|
||||||
|
GetInt32FromPayload() ([]int32, error)
|
||||||
|
GetInt64FromPayload() ([]int64, error)
|
||||||
|
GetFloatFromPayload() ([]float32, error)
|
||||||
|
GetDoubleFromPayload() ([]float64, error)
|
||||||
|
GetOneStringFromPayload(idx int) (string, error)
|
||||||
|
GetBinaryVectorFromPayload() ([]byte, int, error)
|
||||||
|
GetFloatVectorFromPayload() ([]float32, int, error)
|
||||||
|
GetPayloadLengthFromReader() (int, error)
|
||||||
|
ReleasePayloadReader() error
|
||||||
|
Close() error
|
||||||
|
}
|
||||||
type (
|
type (
|
||||||
PayloadWriter struct {
|
PayloadWriter struct {
|
||||||
payloadWriterPtr C.CPayloadWriter
|
payloadWriterPtr C.CPayloadWriter
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user