From e65cfe1e3d5bc23c37a2c8248b10ccc52ad46db4 Mon Sep 17 00:00:00 2001 From: FluorineDog Date: Sat, 19 Dec 2020 10:36:49 +0800 Subject: [PATCH] Enable complex dsl parser Signed-off-by: FluorineDog --- Makefile | 2 +- cmd/binlog/main.go | 17 + internal/core/src/query/Plan.cpp | 259 +++++++++---- .../src/query/visitors/ExecExprVisitor.cpp | 1 + internal/core/unittest/test_expr.cpp | 101 +++++ internal/storage/print_binglog_test.go | 58 +++ internal/storage/print_binlog.go | 347 ++++++++++++++++++ 7 files changed, 721 insertions(+), 64 deletions(-) create mode 100644 cmd/binlog/main.go create mode 100644 internal/storage/print_binglog_test.go create mode 100644 internal/storage/print_binlog.go diff --git a/Makefile b/Makefile index c2081907a1..1e19455f47 100644 --- a/Makefile +++ b/Makefile @@ -61,7 +61,7 @@ ruleguard: verifiers: getdeps cppcheck fmt lint ruleguard # Builds various components locally. -build-go: +build-go: build-cpp @echo "Building each component's binary to './bin'" @echo "Building master ..." @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="0" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/master $(PWD)/cmd/master/main.go 1>/dev/null diff --git a/cmd/binlog/main.go b/cmd/binlog/main.go new file mode 100644 index 0000000000..113f00ea57 --- /dev/null +++ b/cmd/binlog/main.go @@ -0,0 +1,17 @@ +package main + +import ( + "fmt" + "os" + + "github.com/zilliztech/milvus-distributed/internal/storage" +) + +func main() { + if len(os.Args) == 1 { + fmt.Println("usage: binlog file1 file2 ...") + } + if err := storage.PrintBinlogFiles(os.Args[1:]); err != nil { + fmt.Printf("error: %s\n", err.Error()) + } +} diff --git a/internal/core/src/query/Plan.cpp b/internal/core/src/query/Plan.cpp index a8d21f7359..5e0b07a792 100644 --- a/internal/core/src/query/Plan.cpp +++ b/internal/core/src/query/Plan.cpp @@ -20,6 +20,7 @@ #include #include #include +#include namespace milvus::query { @@ -39,10 +40,8 @@ const std::map RangeExpr::mapping_ = { class Parser { public: - static std::unique_ptr - CreatePlan(const Schema& schema, const std::string& dsl_str) { - return Parser(schema).CreatePlanImpl(dsl_str); - } + friend std::unique_ptr + CreatePlan(const Schema& schema, const std::string& dsl_str); private: std::unique_ptr @@ -51,29 +50,55 @@ class Parser { explicit Parser(const Schema& schema) : schema(schema) { } + // vector node parser, should be called exactly once per pass. std::unique_ptr ParseVecNode(const Json& out_body); + // Dispatcher of all parse function + // NOTE: when nullptr, it is a pure vector node + ExprPtr + ParseAnyNode(const Json& body); + + ExprPtr + ParseMustNode(const Json& body); + + ExprPtr + ParseShouldNode(const Json& body); + + ExprPtr + ParseShouldNotNode(const Json& body); + + // parse the value of "should"/"must"/"should_not" entry + std::vector + ParseItemList(const Json& body); + + // parse the value of "range" entry + ExprPtr + ParseRangeNode(const Json& out_body); + + // parse the value of "term" entry + ExprPtr + ParseTermNode(const Json& out_body); + + private: + // template implementation of leaf parser + // used by corresponding parser + template - std::unique_ptr + ExprPtr ParseRangeNodeImpl(const std::string& field_name, const Json& body); template - std::unique_ptr + ExprPtr ParseTermNodeImpl(const std::string& field_name, const Json& body); - std::unique_ptr - ParseRangeNode(const Json& out_body); - - std::unique_ptr - ParseTermNode(const Json& out_body); - private: const Schema& schema; std::map tag2field_; // PlaceholderName -> FieldId + std::optional> vector_node_opt_; }; -std::unique_ptr +ExprPtr Parser::ParseRangeNode(const Json& out_body) { Assert(out_body.is_object()); Assert(out_body.size() == 1); @@ -84,9 +109,8 @@ Parser::ParseRangeNode(const Json& out_body) { Assert(!field_is_vector(data_type)); switch (data_type) { - case DataType::BOOL: { + case DataType::BOOL: return ParseRangeNodeImpl(field_name, body); - } case DataType::INT8: return ParseRangeNodeImpl(field_name, body); case DataType::INT16: @@ -106,51 +130,22 @@ Parser::ParseRangeNode(const Json& out_body) { std::unique_ptr Parser::CreatePlanImpl(const std::string& dsl_str) { - auto plan = std::make_unique(schema); - auto dsl = nlohmann::json::parse(dsl_str); - nlohmann::json vec_pack; - std::optional> predicate; - // top level - auto& bool_dsl = dsl.at("bool"); - if (bool_dsl.contains("must")) { - auto& packs = bool_dsl.at("must"); - Assert(packs.is_array()); - for (auto& pack : packs) { - if (pack.contains("vector")) { - auto& out_body = pack.at("vector"); - plan->plan_node_ = ParseVecNode(out_body); - } else if (pack.contains("term")) { - AssertInfo(!predicate, "unsupported complex DSL"); - auto& out_body = pack.at("term"); - predicate = ParseTermNode(out_body); - } else if (pack.contains("range")) { - AssertInfo(!predicate, "unsupported complex DSL"); - auto& out_body = pack.at("range"); - predicate = ParseRangeNode(out_body); - } else { - PanicInfo("unsupported node"); - } - } - AssertInfo(plan->plan_node_, "vector node not found"); - } else if (bool_dsl.contains("vector")) { - auto& out_body = bool_dsl.at("vector"); - plan->plan_node_ = ParseVecNode(out_body); - Assert(plan->plan_node_); - } else { - PanicInfo("Unsupported DSL"); + auto dsl = Json::parse(dsl_str); + auto bool_dsl = dsl.at("bool"); + auto predicate = ParseAnyNode(bool_dsl); + Assert(vector_node_opt_.has_value()); + auto vec_node = std::move(vector_node_opt_).value(); + if (predicate != nullptr) { + vec_node->predicate_ = std::move(predicate); } - plan->plan_node_->predicate_ = std::move(predicate); + + auto plan = std::make_unique(schema); plan->tag2field_ = std::move(tag2field_); - // TODO: target_entry parser - // if schema autoid is true, - // prepend target_entries_ with row_id - // else - // with primary_key - // + plan->plan_node_ = std::move(vec_node); return plan; } -std::unique_ptr +ExprPtr Parser::ParseTermNode(const Json& out_body) { Assert(out_body.size() == 1); auto out_iter = out_body.begin(); @@ -221,7 +216,7 @@ Parser::ParseVecNode(const Json& out_body) { } template -std::unique_ptr +ExprPtr Parser::ParseTermNodeImpl(const std::string& field_name, const Json& body) { auto expr = std::make_unique>(); auto data_type = schema[field_name].get_data_type(); @@ -249,7 +244,7 @@ Parser::ParseTermNodeImpl(const std::string& field_name, const Json& body) { } template -std::unique_ptr +ExprPtr Parser::ParseRangeNodeImpl(const std::string& field_name, const Json& body) { auto expr = std::make_unique>(); auto data_type = schema[field_name].get_data_type(); @@ -278,12 +273,6 @@ Parser::ParseRangeNodeImpl(const std::string& field_name, const Json& body) { return expr; } -std::unique_ptr -CreatePlan(const Schema& schema, const std::string& dsl_str) { - auto plan = Parser::CreatePlan(schema, dsl_str); - return plan; -} - std::unique_ptr ParsePlaceholderGroup(const Plan* plan, const std::string& blob) { namespace ser = milvus::proto::service; @@ -313,6 +302,150 @@ ParsePlaceholderGroup(const Plan* plan, const std::string& blob) { return result; } +std::unique_ptr +CreatePlan(const Schema& schema, const std::string& dsl_str) { + auto plan = Parser(schema).CreatePlanImpl(dsl_str); + return plan; +} + +std::vector +Parser::ParseItemList(const Json& body) { + std::vector results; + if (body.is_object()) { + // only one item; + auto new_entry = ParseAnyNode(body); + results.emplace_back(std::move(new_entry)); + } else { + // item array + Assert(body.is_array()); + for (auto& item : body) { + auto new_entry = ParseAnyNode(item); + results.emplace_back(std::move(new_entry)); + } + } + auto old_size = results.size(); + + auto new_end = std::remove_if(results.begin(), results.end(), [](const ExprPtr& x) { return x == nullptr; }); + + results.resize(new_end - results.begin()); + + return results; +} + +ExprPtr +Parser::ParseAnyNode(const Json& out_body) { + Assert(out_body.is_object()); + Assert(out_body.size() == 1); + + auto out_iter = out_body.begin(); + + auto key = out_iter.key(); + auto body = out_iter.value(); + + if (key == "must") { + return ParseMustNode(body); + } else if (key == "should") { + return ParseShouldNode(body); + } else if (key == "should_not") { + return ParseShouldNotNode(body); + } else if (key == "range") { + return ParseRangeNode(body); + } else if (key == "term") { + return ParseTermNode(body); + } else if (key == "vector") { + auto vec_node = ParseVecNode(body); + Assert(!vector_node_opt_.has_value()); + vector_node_opt_ = std::move(vec_node); + return nullptr; + } else { + PanicInfo("unsupported key: " + key); + } +} + +template +static ExprPtr +ConstructTree(Merger merger, std::vector item_list) { + if (item_list.size() == 0) { + return nullptr; + } + + if (item_list.size() == 1) { + return std::move(item_list[0]); + } + + // Note: use deque to construct a binary tree + // Op + // / \ + // Op Op + // | \ | \ + // A B C D + std::deque binary_queue; + for (auto& item : item_list) { + Assert(item != nullptr); + binary_queue.push_back(std::move(item)); + } + while (binary_queue.size() > 1) { + auto left = std::move(binary_queue.front()); + binary_queue.pop_front(); + auto right = std::move(binary_queue.front()); + binary_queue.pop_front(); + binary_queue.push_back(merger(std::move(left), std::move(right))); + } + Assert(binary_queue.size() == 1); + return std::move(binary_queue.front()); +} + +ExprPtr +Parser::ParseMustNode(const Json& body) { + auto item_list = ParseItemList(body); + auto merger = [](ExprPtr left, ExprPtr right) { + using OpType = BoolBinaryExpr::OpType; + auto res = std::make_unique(); + res->op_type_ = OpType::LogicalAnd; + res->left_ = std::move(left); + res->right_ = std::move(right); + return res; + }; + return ConstructTree(merger, std::move(item_list)); +} + +ExprPtr +Parser::ParseShouldNode(const Json& body) { + auto item_list = ParseItemList(body); + Assert(item_list.size() >= 1); + auto merger = [](ExprPtr left, ExprPtr right) { + using OpType = BoolBinaryExpr::OpType; + auto res = std::make_unique(); + res->op_type_ = OpType::LogicalOr; + res->left_ = std::move(left); + res->right_ = std::move(right); + return res; + }; + return ConstructTree(merger, std::move(item_list)); +} + +ExprPtr +Parser::ParseShouldNotNode(const Json& body) { + auto item_list = ParseItemList(body); + Assert(item_list.size() >= 1); + auto merger = [](ExprPtr left, ExprPtr right) { + using OpType = BoolBinaryExpr::OpType; + auto res = std::make_unique(); + res->op_type_ = OpType::LogicalAnd; + res->left_ = std::move(left); + res->right_ = std::move(right); + return res; + }; + auto subtree = ConstructTree(merger, std::move(item_list)); + + using OpType = BoolUnaryExpr::OpType; + auto res = std::make_unique(); + res->op_type_ = OpType::LogicalNot; + res->child_ = std::move(subtree); + + return res; +} + int64_t GetTopK(const Plan* plan) { return plan->plan_node_->query_info_.topK_; diff --git a/internal/core/src/query/visitors/ExecExprVisitor.cpp b/internal/core/src/query/visitors/ExecExprVisitor.cpp index 35d6bd2849..e45969c38c 100644 --- a/internal/core/src/query/visitors/ExecExprVisitor.cpp +++ b/internal/core/src/query/visitors/ExecExprVisitor.cpp @@ -67,6 +67,7 @@ ExecExprVisitor::visit(BoolUnaryExpr& expr) { switch (expr.op_type_) { case OpType::LogicalNot: { chunk.flip(); + break; } default: { PanicInfo("Invalid OpType"); diff --git a/internal/core/unittest/test_expr.cpp b/internal/core/unittest/test_expr.cpp index 74744d3c78..e7421dfd39 100644 --- a/internal/core/unittest/test_expr.cpp +++ b/internal/core/unittest/test_expr.cpp @@ -410,3 +410,104 @@ TEST(Expr, TestTerm) { } } } + +TEST(Expr, TestSimpleDsl) { + using namespace milvus::query; + using namespace milvus::segcore; + + auto vec_dsl = Json::parse(R"( + { + "vector": { + "fakevec": { + "metric_type": "L2", + "params": { + "nprobe": 10 + }, + "query": "$0", + "topk": 10 + } + } + } +)"); + + int N = 32; + auto get_item = [&](int base, int bit = 1) { + std::vector terms; + // note: random gen range is [0, 2N) + for (int i = 0; i < N * 2; ++i) { + if (((i >> base) & 0x1) == bit) { + terms.push_back(i); + } + } + Json s; + s["term"]["age"]["values"] = terms; + return s; + }; + // std::cout << get_item(0).dump(-2); + // std::cout << vec_dsl.dump(-2); + std::vector>> testcases; + { + Json dsl; + dsl["must"] = Json::array({vec_dsl, get_item(0), get_item(1), get_item(2, 0), get_item(3)}); + testcases.emplace_back(dsl, [](int x) { return (x & 0b1111) == 0b1011; }); + } + + { + Json dsl; + Json sub_dsl; + sub_dsl["must"] = Json::array({get_item(0), get_item(1), get_item(2, 0), get_item(3)}); + dsl["must"] = Json::array({sub_dsl, vec_dsl}); + testcases.emplace_back(dsl, [](int x) { return (x & 0b1111) == 0b1011; }); + } + + { + Json dsl; + Json sub_dsl; + sub_dsl["should"] = Json::array({get_item(0), get_item(1), get_item(2, 0), get_item(3)}); + dsl["must"] = Json::array({sub_dsl, vec_dsl}); + testcases.emplace_back(dsl, [](int x) { return !!((x & 0b1111) ^ 0b0100); }); + } + + { + Json dsl; + Json sub_dsl; + sub_dsl["should_not"] = Json::array({get_item(0), get_item(1), get_item(2, 0), get_item(3)}); + dsl["must"] = Json::array({sub_dsl, vec_dsl}); + testcases.emplace_back(dsl, [](int x) { return (x & 0b1111) != 0b1011; }); + } + + auto schema = std::make_shared(); + schema->AddField("fakevec", DataType::VECTOR_FLOAT, 16, MetricType::METRIC_L2); + schema->AddField("age", DataType::INT32); + + auto seg = CreateSegment(schema); + std::vector age_col; + int num_iters = 100; + for (int iter = 0; iter < num_iters; ++iter) { + auto raw_data = DataGen(schema, N, iter); + auto new_age_col = raw_data.get_col(1); + age_col.insert(age_col.end(), new_age_col.begin(), new_age_col.end()); + seg->PreInsert(N); + seg->Insert(iter * N, N, raw_data.row_ids_.data(), raw_data.timestamps_.data(), raw_data.raw_); + } + + auto seg_promote = dynamic_cast(seg.get()); + ExecExprVisitor visitor(*seg_promote); + for (auto [clause, ref_func] : testcases) { + Json dsl; + dsl["bool"] = clause; + // std::cout << dsl.dump(2); + auto plan = CreatePlan(*schema, dsl.dump()); + auto final = visitor.call_child(*plan->plan_node_->predicate_.value()); + EXPECT_EQ(final.size(), upper_div(N * num_iters, DefaultElementPerChunk)); + + for (int i = 0; i < N * num_iters; ++i) { + auto vec_id = i / DefaultElementPerChunk; + auto offset = i % DefaultElementPerChunk; + bool ans = final[vec_id][offset]; + auto val = age_col[i]; + auto ref = ref_func(val); + ASSERT_EQ(ans, ref) << clause << "@" << i << "!!" << val; + } + } +} diff --git a/internal/storage/print_binglog_test.go b/internal/storage/print_binglog_test.go new file mode 100644 index 0000000000..2d02f2157e --- /dev/null +++ b/internal/storage/print_binglog_test.go @@ -0,0 +1,58 @@ +package storage + +import ( + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" + "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" +) + +func TestPrintBinlogFilesInt64(t *testing.T) { + w, err := NewInsertBinlogWriter(schemapb.DataType_INT64, 10, 20, 30, 40) + assert.Nil(t, err) + + curTS := time.Now().UnixNano() / int64(time.Millisecond) + + e1, err := w.NextInsertEventWriter() + assert.Nil(t, err) + err = e1.AddDataToPayload([]int64{1, 2, 3}) + assert.Nil(t, err) + err = e1.AddDataToPayload([]int32{4, 5, 6}) + assert.NotNil(t, err) + err = e1.AddDataToPayload([]int64{4, 5, 6}) + assert.Nil(t, err) + e1.SetStartTimestamp(tsoutil.ComposeTS(curTS+10*60*1000, 0)) + e1.SetEndTimestamp(tsoutil.ComposeTS(curTS+20*60*1000, 0)) + + e2, err := w.NextInsertEventWriter() + assert.Nil(t, err) + err = e2.AddDataToPayload([]int64{7, 8, 9}) + assert.Nil(t, err) + err = e2.AddDataToPayload([]bool{true, false, true}) + assert.NotNil(t, err) + err = e2.AddDataToPayload([]int64{10, 11, 12}) + assert.Nil(t, err) + e2.SetStartTimestamp(tsoutil.ComposeTS(curTS+30*60*1000, 0)) + e2.SetEndTimestamp(tsoutil.ComposeTS(curTS+40*60*1000, 0)) + + w.SetStartTimeStamp(tsoutil.ComposeTS(curTS, 0)) + w.SetEndTimeStamp(tsoutil.ComposeTS(curTS+3600*1000, 0)) + + _, err = w.GetBuffer() + assert.NotNil(t, err) + err = w.Close() + assert.Nil(t, err) + buf, err := w.GetBuffer() + assert.Nil(t, err) + + fd, err := os.Create("/tmp/binlog_int64.db") + assert.Nil(t, err) + num, err := fd.Write(buf) + assert.Nil(t, err) + assert.Equal(t, num, len(buf)) + err = fd.Close() + assert.Nil(t, err) +} diff --git a/internal/storage/print_binlog.go b/internal/storage/print_binlog.go new file mode 100644 index 0000000000..c074de2f7b --- /dev/null +++ b/internal/storage/print_binlog.go @@ -0,0 +1,347 @@ +package storage + +import ( + "fmt" + "os" + "syscall" + + "github.com/golang/protobuf/proto" + "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" + "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" + "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" +) + +func PrintBinlogFiles(fileList []string) error { + for _, file := range fileList { + if err := printBinlogFile(file); err != nil { + return err + } + } + return nil +} + +func printBinlogFile(filename string) error { + fd, err := os.OpenFile(filename, os.O_RDONLY, 0400) + if err != nil { + return err + } + defer fd.Close() + + fileInfo, err := fd.Stat() + if err != nil { + return err + } + + fmt.Printf("file size = %d\n", fileInfo.Size()) + + b, err := syscall.Mmap(int(fd.Fd()), 0, int(fileInfo.Size()), syscall.PROT_READ, syscall.MAP_SHARED) + if err != nil { + return nil + } + defer syscall.Munmap(b) + + fmt.Printf("buf size = %d\n", len(b)) + + r, err := NewBinlogReader(b) + if err != nil { + return err + } + defer r.Close() + + fmt.Println("descriptor event header:") + physical, _ := tsoutil.ParseTS(r.descriptorEvent.descriptorEventHeader.Timestamp) + fmt.Printf("\tTimestamp: %v\n", physical) + fmt.Printf("\tTypeCode: %s\n", r.descriptorEvent.descriptorEventHeader.TypeCode.String()) + fmt.Printf("\tServerID: %d\n", r.descriptorEvent.descriptorEventHeader.ServerID) + fmt.Printf("\tEventLength: %d\n", r.descriptorEvent.descriptorEventHeader.EventLength) + fmt.Printf("\tNextPosition :%d\n", r.descriptorEvent.descriptorEventHeader.NextPosition) + fmt.Println("descriptor event data:") + fmt.Printf("\tBinlogVersion: %d\n", r.descriptorEvent.descriptorEventData.BinlogVersion) + fmt.Printf("\tServerVersion: %d\n", r.descriptorEvent.descriptorEventData.ServerVersion) + fmt.Printf("\tCommitID: %d\n", r.descriptorEvent.descriptorEventData.CommitID) + fmt.Printf("\tHeaderLength: %d\n", r.descriptorEvent.descriptorEventData.HeaderLength) + fmt.Printf("\tCollectionID: %d\n", r.descriptorEvent.descriptorEventData.CollectionID) + fmt.Printf("\tPartitionID: %d\n", r.descriptorEvent.descriptorEventData.PartitionID) + fmt.Printf("\tSegmentID: %d\n", r.descriptorEvent.descriptorEventData.SegmentID) + fmt.Printf("\tFieldID: %d\n", r.descriptorEvent.descriptorEventData.FieldID) + physical, _ = tsoutil.ParseTS(r.descriptorEvent.descriptorEventData.StartTimestamp) + fmt.Printf("\tStartTimestamp: %v\n", physical) + physical, _ = tsoutil.ParseTS(r.descriptorEvent.descriptorEventData.EndTimestamp) + fmt.Printf("\tEndTimestamp: %v\n", physical) + dataTypeName, ok := schemapb.DataType_name[int32(r.descriptorEvent.descriptorEventData.PayloadDataType)] + if !ok { + return errors.Errorf("undefine data type %d", r.descriptorEvent.descriptorEventData.PayloadDataType) + } + fmt.Printf("\tPayloadDataType: %v\n", dataTypeName) + fmt.Printf("\tPostHeaderLengths: %v\n", r.descriptorEvent.descriptorEventData.PostHeaderLengths) + eventNum := 0 + for { + event, err := r.NextEventReader() + if err != nil { + return err + } + if event == nil { + break + } + fmt.Printf("event %d header:\n", eventNum) + physical, _ = tsoutil.ParseTS(event.eventHeader.Timestamp) + fmt.Printf("\tTimestamp: %v\n", physical) + fmt.Printf("\tTypeCode: %s\n", event.eventHeader.TypeCode.String()) + fmt.Printf("\tServerID: %d\n", event.eventHeader.ServerID) + fmt.Printf("\tEventLength: %d\n", event.eventHeader.EventLength) + fmt.Printf("\tNextPosition: %d\n", event.eventHeader.NextPosition) + switch event.eventHeader.TypeCode { + case InsertEventType: + evd, ok := event.eventData.(*insertEventData) + if !ok { + return errors.Errorf("incorrect event data type") + } + fmt.Printf("event %d insert event:\n", eventNum) + physical, _ = tsoutil.ParseTS(evd.StartTimestamp) + fmt.Printf("\tStartTimestamp: %v\n", physical) + physical, _ = tsoutil.ParseTS(evd.EndTimestamp) + fmt.Printf("\tEndTimestamp: %v\n", physical) + if err := printPayloadValues(r.descriptorEvent.descriptorEventData.PayloadDataType, event.PayloadReaderInterface); err != nil { + return err + } + case DeleteEventType: + evd, ok := event.eventData.(*deleteEventData) + if !ok { + return errors.Errorf("incorrect event data type") + } + fmt.Printf("event %d delete event:\n", eventNum) + physical, _ = tsoutil.ParseTS(evd.StartTimestamp) + fmt.Printf("\tStartTimestamp: %v\n", physical) + physical, _ = tsoutil.ParseTS(evd.EndTimestamp) + fmt.Printf("\tEndTimestamp: %v\n", physical) + if err := printPayloadValues(r.descriptorEvent.descriptorEventData.PayloadDataType, event.PayloadReaderInterface); err != nil { + return err + } + case CreateCollectionEventType: + evd, ok := event.eventData.(*createCollectionEventData) + if !ok { + return errors.Errorf("incorrect event data type") + } + fmt.Printf("event %d create collection event:\n", eventNum) + physical, _ = tsoutil.ParseTS(evd.StartTimestamp) + fmt.Printf("\tStartTimestamp: %v\n", physical) + physical, _ = tsoutil.ParseTS(evd.EndTimestamp) + fmt.Printf("\tEndTimestamp: %v\n", physical) + if err := printDDLPayloadValues(event.eventHeader.TypeCode, r.descriptorEvent.descriptorEventData.PayloadDataType, event.PayloadReaderInterface); err != nil { + return err + } + case DropCollectionEventType: + evd, ok := event.eventData.(*dropCollectionEventData) + if !ok { + return errors.Errorf("incorrect event data type") + } + fmt.Printf("event %d drop collection event:\n", eventNum) + physical, _ = tsoutil.ParseTS(evd.StartTimestamp) + fmt.Printf("\tStartTimestamp: %v\n", physical) + physical, _ = tsoutil.ParseTS(evd.EndTimestamp) + fmt.Printf("\tEndTimestamp: %v\n", physical) + if err := printDDLPayloadValues(event.eventHeader.TypeCode, r.descriptorEvent.descriptorEventData.PayloadDataType, event.PayloadReaderInterface); err != nil { + return err + } + case CreatePartitionEventType: + evd, ok := event.eventData.(*createPartitionEventData) + if !ok { + return errors.Errorf("incorrect event data type") + } + fmt.Printf("event %d create partition event:\n", eventNum) + physical, _ = tsoutil.ParseTS(evd.StartTimestamp) + fmt.Printf("\tStartTimestamp: %v\n", physical) + physical, _ = tsoutil.ParseTS(evd.EndTimestamp) + fmt.Printf("\tEndTimestamp: %v\n", physical) + if err := printDDLPayloadValues(event.eventHeader.TypeCode, r.descriptorEvent.descriptorEventData.PayloadDataType, event.PayloadReaderInterface); err != nil { + return err + } + case DropPartitionEventType: + evd, ok := event.eventData.(*dropPartitionEventData) + if !ok { + return errors.Errorf("incorrect event data type") + } + fmt.Printf("event %d drop partition event:\n", eventNum) + physical, _ = tsoutil.ParseTS(evd.StartTimestamp) + fmt.Printf("\tStartTimestamp: %v\n", physical) + physical, _ = tsoutil.ParseTS(evd.EndTimestamp) + fmt.Printf("\tEndTimestamp: %v\n", physical) + if err := printDDLPayloadValues(event.eventHeader.TypeCode, r.descriptorEvent.descriptorEventData.PayloadDataType, event.PayloadReaderInterface); err != nil { + return err + } + default: + return errors.Errorf("undefined event typd %d\n", event.eventHeader.TypeCode) + } + eventNum++ + } + + return nil +} + +func printPayloadValues(colType schemapb.DataType, reader PayloadReaderInterface) error { + fmt.Println("\tpayload values:") + switch colType { + case schemapb.DataType_BOOL: + val, err := reader.GetBoolFromPayload() + if err != nil { + return err + } + for i, v := range val { + fmt.Printf("\t\t%d : %v\n", i, v) + } + case schemapb.DataType_INT8: + val, err := reader.GetInt8FromPayload() + if err != nil { + return err + } + for i, v := range val { + fmt.Printf("\t\t%d : %d\n", i, v) + } + case schemapb.DataType_INT16: + val, err := reader.GetInt16FromPayload() + if err != nil { + return err + } + for i, v := range val { + fmt.Printf("\t\t%d : %d\n", i, v) + } + case schemapb.DataType_INT32: + val, err := reader.GetInt32FromPayload() + if err != nil { + return err + } + for i, v := range val { + fmt.Printf("\t\t%d : %d\n", i, v) + } + case schemapb.DataType_INT64: + val, err := reader.GetInt64FromPayload() + if err != nil { + return err + } + for i, v := range val { + fmt.Printf("\t\t%d : %d\n", i, v) + } + case schemapb.DataType_FLOAT: + val, err := reader.GetFloatFromPayload() + if err != nil { + return err + } + for i, v := range val { + fmt.Printf("\t\t%d : %f\n", i, v) + } + case schemapb.DataType_DOUBLE: + val, err := reader.GetDoubleFromPayload() + if err != nil { + return err + } + for i, v := range val { + fmt.Printf("\t\t%d : %f\n", i, v) + } + case schemapb.DataType_STRING: + rows, err := reader.GetPayloadLengthFromReader() + if err != nil { + return err + } + for i := 0; i < rows; i++ { + val, err := reader.GetOneStringFromPayload(i) + if err != nil { + return err + } + fmt.Printf("\t\t%d : %s\n", i, val) + } + case schemapb.DataType_VECTOR_BINARY: + val, dim, err := reader.GetBinaryVectorFromPayload() + if err != nil { + return err + } + dim = dim / 8 + length := len(val) / dim + for i := 0; i < length; i++ { + fmt.Printf("\t\t%d :", i) + for j := 0; j < dim; j++ { + idx := i*dim + j + fmt.Printf(" %02x", val[idx]) + } + fmt.Println() + } + case schemapb.DataType_VECTOR_FLOAT: + val, dim, err := reader.GetFloatVectorFromPayload() + if err != nil { + return err + } + length := len(val) / dim + for i := 0; i < length; i++ { + fmt.Printf("\t\t%d :", i) + for j := 0; j < dim; j++ { + idx := i*dim + j + fmt.Printf(" %f", val[idx]) + } + fmt.Println() + } + default: + return errors.Errorf("undefined data type") + } + return nil +} + +func printDDLPayloadValues(eventType EventTypeCode, colType schemapb.DataType, reader PayloadReaderInterface) error { + fmt.Println("\tpayload values:") + switch colType { + case schemapb.DataType_INT64: + val, err := reader.GetInt64FromPayload() + if err != nil { + return err + } + for i, v := range val { + physical, logical := tsoutil.ParseTS(uint64(v)) + fmt.Printf("\t\t%d : physical : %v ; logical : %d\n", i, physical, logical) + } + case schemapb.DataType_STRING: + rows, err := reader.GetPayloadLengthFromReader() + if err != nil { + return err + } + for i := 0; i < rows; i++ { + val, err := reader.GetOneStringFromPayload(i) + if err != nil { + return err + } + switch eventType { + case CreateCollectionEventType: + var req internalpb.CreateCollectionRequest + if err := proto.Unmarshal(([]byte)(val), &req); err != nil { + return err + } + msg := proto.MarshalTextString(&req) + fmt.Printf("\t\t%d : create collection: %s\n", i, msg) + case DropCollectionEventType: + var req internalpb.DropPartitionRequest + if err := proto.Unmarshal(([]byte)(val), &req); err != nil { + return err + } + msg := proto.MarshalTextString(&req) + fmt.Printf("\t\t%d : drop collection: %s\n", i, msg) + case CreatePartitionEventType: + var req internalpb.CreatePartitionRequest + if err := proto.Unmarshal(([]byte)(val), &req); err != nil { + return err + } + msg := proto.MarshalTextString(&req) + fmt.Printf("\t\t%d : create partition: %s\n", i, msg) + case DropPartitionEventType: + var req internalpb.DropPartitionRequest + if err := proto.Unmarshal(([]byte)(val), &req); err != nil { + return err + } + msg := proto.MarshalTextString(&req) + fmt.Printf("\t\t%d : drop partition: %s\n", i, msg) + default: + return errors.Errorf("undefined ddl event type %d", eventType) + } + } + default: + return errors.Errorf("undefined data type") + } + return nil +}