enhance: some minor code cleanup, prepare for scalar benchmark (#45008)

issue: https://github.com/milvus-io/milvus/issues/44452

---------

Signed-off-by: Buqian Zheng <zhengbuqian@gmail.com>
This commit is contained in:
Buqian Zheng 2025-10-24 14:22:05 +08:00 committed by GitHub
parent 199f6d936e
commit c284e8c4a8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 339 additions and 127 deletions

View File

@ -386,6 +386,21 @@ run-test-cpp:
@echo $(PWD)/scripts/run_cpp_unittest.sh arg=${filter}
@(env bash $(PWD)/scripts/run_cpp_unittest.sh arg=${filter})
# tool for benchmark
exprparser-tool:
@echo "Building exprparser helper ..."
@source $(PWD)/scripts/setenv.sh && \
mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && \
GO111MODULE=on $(GO) build -pgo=$(PGO_PATH)/default.pgo -ldflags="-r $${RPATH}" -o $(INSTALL_PATH)/exprparser $(PWD)/cmd/tools/exprparser/main.go 1>/dev/null
# Build unittest with external scalar-benchmark enabled
scalar-bench: generated-proto exprparser-tool
@echo "Building Milvus cpp unittest with scalar-benchmark ... "
@(export CMAKE_EXTRA_ARGS="-DENABLE_SCALAR_BENCH=ON"; env bash $(PWD)/scripts/core_build.sh -t ${mode} -a ${use_asan} -u -n ${use_disk_index} -y ${use_dynamic_simd} ${AZURE_OPTION} -x ${index_engine} -o ${use_opendal} -f $(tantivy_features))
scalar-bench-ui:
@echo "Starting scalar-benchmark ui ... "
@(cd cmake_build/unittest/scalar-benchmark-src/ui && ./serve_ui_dev.sh)
# Run code coverage.
codecov: codecov-go codecov-cpp

View File

@ -0,0 +1,115 @@
package main
import (
"bufio"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"os"
"strings"
"google.golang.org/protobuf/proto"
schemapb "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/parser/planparserv2"
_ "github.com/milvus-io/milvus/pkg/v2/proto/planpb"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
type parseRequest struct {
ID string `json:"id"`
Op string `json:"op"`
SchemaB64 string `json:"schema_b64"`
Expr string `json:"expr"`
Options struct {
IsCount bool `json:"is_count"`
Limit int64 `json:"limit"`
} `json:"options"`
}
type parseResponse struct {
ID string `json:"id"`
OK bool `json:"ok"`
PlanB64 string `json:"plan_b64,omitempty"`
Error string `json:"error,omitempty"`
}
func handle(line string) parseResponse {
line = strings.TrimSpace(line)
if line == "" {
return parseResponse{ID: "", OK: false, Error: "empty line"}
}
var req parseRequest
if err := json.Unmarshal([]byte(line), &req); err != nil {
return parseResponse{ID: req.ID, OK: false, Error: fmt.Sprintf("invalid json: %v", err)}
}
if req.Op != "parse_expr" {
return parseResponse{ID: req.ID, OK: false, Error: "unsupported op"}
}
if req.SchemaB64 == "" {
return parseResponse{ID: req.ID, OK: false, Error: "missing schema_b64"}
}
if req.Expr == "" {
return parseResponse{ID: req.ID, OK: false, Error: "missing expr"}
}
schemaBytes, err := base64.StdEncoding.DecodeString(req.SchemaB64)
if err != nil {
return parseResponse{ID: req.ID, OK: false, Error: fmt.Sprintf("decode schema_b64 failed: %v", err)}
}
var schema schemapb.CollectionSchema
if err := proto.Unmarshal(schemaBytes, &schema); err != nil {
return parseResponse{ID: req.ID, OK: false, Error: fmt.Sprintf("unmarshal schema failed: %v", err)}
}
helper, err := typeutil.CreateSchemaHelper(&schema)
if err != nil {
return parseResponse{ID: req.ID, OK: false, Error: fmt.Sprintf("schema helper error: %v", err)}
}
planNode, err := planparserv2.CreateRetrievePlan(helper, req.Expr, nil)
if err != nil {
return parseResponse{ID: req.ID, OK: false, Error: fmt.Sprintf("parse error: %v", err)}
}
// Apply options if provided
if q := planNode.GetQuery(); q != nil {
q.IsCount = req.Options.IsCount
if req.Options.Limit > 0 {
q.Limit = req.Options.Limit
}
}
planBytes, err := proto.Marshal(planNode)
if err != nil {
return parseResponse{ID: req.ID, OK: false, Error: fmt.Sprintf("marshal plan failed: %v", err)}
}
return parseResponse{ID: req.ID, OK: true, PlanB64: base64.StdEncoding.EncodeToString(planBytes)}
}
func writeResp(w *bufio.Writer, resp parseResponse) {
b, _ := json.Marshal(resp)
_, _ = w.Write(b)
_ = w.WriteByte('\n')
_ = w.Flush()
}
func main() {
in := bufio.NewScanner(os.Stdin)
buf := make([]byte, 0, 1024*1024)
in.Buffer(buf, 16*1024*1024)
w := bufio.NewWriter(os.Stdout)
for {
if !in.Scan() {
if err := in.Err(); err != nil && err != io.EOF {
writeResp(w, parseResponse{ID: "", OK: false, Error: fmt.Sprintf("scan error: %v", err)})
}
break
}
resp := handle(in.Text())
writeResp(w, resp)
}
}

View File

@ -40,9 +40,6 @@
#include "common/TypeTraits.h"
namespace milvus {
using DataType = milvus::DataType;
class FieldDataBase {
public:
explicit FieldDataBase(DataType data_type, bool nullable)

View File

@ -68,6 +68,65 @@ FieldMeta::get_analyzer_params() const {
return ParseTokenizerParams(params);
}
milvus::proto::schema::FieldSchema
FieldMeta::ToProto() const {
milvus::proto::schema::FieldSchema proto;
proto.set_fieldid(id_.get());
proto.set_name(name_.get());
proto.set_data_type(ToProtoDataType(type_));
proto.set_nullable(nullable_);
if (has_default_value()) {
*proto.mutable_default_value() = *default_value_;
}
if (element_type_ != DataType::NONE) {
proto.set_element_type(ToProtoDataType(element_type_));
}
auto add_type_param = [&proto](const std::string& key,
const std::string& value) {
auto* param = proto.add_type_params();
param->set_key(key);
param->set_value(value);
};
auto add_index_param = [&proto](const std::string& key,
const std::string& value) {
auto* param = proto.add_index_params();
param->set_key(key);
param->set_value(value);
};
if (type_ == DataType::VECTOR_ARRAY) {
add_type_param("dim", std::to_string(get_dim()));
if (auto metric = get_metric_type(); metric.has_value()) {
add_index_param("metric_type", metric.value());
}
} else if (IsVectorDataType(type_)) {
if (!IsSparseFloatVectorDataType(type_)) {
add_type_param("dim", std::to_string(get_dim()));
}
if (auto metric = get_metric_type(); metric.has_value()) {
add_index_param("metric_type", metric.value());
}
} else if (IsStringDataType(type_)) {
std::map<std::string, std::string> params;
if (string_info_.has_value()) {
params = string_info_->params;
}
params[MAX_LENGTH] = std::to_string(get_max_len());
params["enable_match"] = enable_match() ? "true" : "false";
params["enable_analyzer"] = enable_analyzer() ? "true" : "false";
for (const auto& [key, value] : params) {
add_type_param(key, value);
}
} else if (IsArrayDataType(type_)) {
// element_type already populated above
}
return proto;
}
FieldMeta
FieldMeta::ParseFrom(const milvus::proto::schema::FieldSchema& schema_proto) {
auto field_id = FieldId(schema_proto.fieldid());

View File

@ -16,6 +16,7 @@
#pragma once
#include <map>
#include <optional>
#include <stdexcept>
#include <string>
@ -251,6 +252,9 @@ class FieldMeta {
return default_value_;
}
milvus::proto::schema::FieldSchema
ToProto() const;
size_t
get_sizeof() const {
AssertInfo(!IsSparseFloatVectorDataType(type_),

View File

@ -114,6 +114,29 @@ Schema::ConvertToArrowSchema() const {
return arrow::schema(arrow_fields);
}
proto::schema::CollectionSchema
Schema::ToProto() const {
proto::schema::CollectionSchema schema_proto;
schema_proto.set_enable_dynamic_field(dynamic_field_id_opt_.has_value());
for (const auto& field_id : field_ids_) {
const auto& meta = fields_.at(field_id);
auto* field_proto = schema_proto.add_fields();
*field_proto = meta.ToProto();
if (primary_field_id_opt_.has_value() &&
field_id == primary_field_id_opt_.value()) {
field_proto->set_is_primary_key(true);
}
if (dynamic_field_id_opt_.has_value() &&
field_id == dynamic_field_id_opt_.value()) {
field_proto->set_is_dynamic(true);
}
}
return schema_proto;
}
std::unique_ptr<std::vector<FieldMeta>>
Schema::AbsentFields(Schema& old_schema) const {
std::vector<FieldMeta> result;

View File

@ -126,6 +126,31 @@ class Schema {
return field_id;
}
// string type
FieldId
AddDebugVarcharField(const FieldName& name,
DataType data_type,
int64_t max_length,
bool nullable,
bool enable_match,
bool enable_analyzer,
std::map<std::string, std::string>& params,
std::optional<DefaultValueType> default_value) {
auto field_id = FieldId(debug_id);
debug_id++;
auto field_meta = FieldMeta(name,
field_id,
data_type,
max_length,
nullable,
enable_match,
enable_analyzer,
params,
std::move(default_value));
this->AddField(std::move(field_meta));
return field_id;
}
// scalar type
void
AddField(const FieldName& name,
@ -303,6 +328,9 @@ class Schema {
const ArrowSchemaPtr
ConvertToArrowSchema() const;
proto::schema::CollectionSchema
ToProto() const;
void
UpdateLoadFields(const std::vector<int64_t>& field_ids) {
load_fields_.clear();

View File

@ -156,6 +156,67 @@ GetDataTypeSize(DataType data_type, int dim = 1) {
}
}
// Convert internal DataType to proto schema DataType
inline proto::schema::DataType
ToProtoDataType(DataType data_type) {
switch (data_type) {
case DataType::NONE:
return proto::schema::DataType::None;
case DataType::BOOL:
return proto::schema::DataType::Bool;
case DataType::INT8:
return proto::schema::DataType::Int8;
case DataType::INT16:
return proto::schema::DataType::Int16;
case DataType::INT32:
return proto::schema::DataType::Int32;
case DataType::INT64:
return proto::schema::DataType::Int64;
case DataType::FLOAT:
return proto::schema::DataType::Float;
case DataType::DOUBLE:
return proto::schema::DataType::Double;
case DataType::STRING:
return proto::schema::DataType::String;
case DataType::VARCHAR:
return proto::schema::DataType::VarChar;
case DataType::ARRAY:
return proto::schema::DataType::Array;
case DataType::JSON:
return proto::schema::DataType::JSON;
case DataType::TEXT:
return proto::schema::DataType::Text;
case DataType::TIMESTAMPTZ:
return proto::schema::DataType::Timestamptz;
case DataType::VECTOR_BINARY:
return proto::schema::DataType::BinaryVector;
case DataType::VECTOR_FLOAT:
return proto::schema::DataType::FloatVector;
case DataType::VECTOR_FLOAT16:
return proto::schema::DataType::Float16Vector;
case DataType::VECTOR_BFLOAT16:
return proto::schema::DataType::BFloat16Vector;
case DataType::VECTOR_SPARSE_U32_F32:
return proto::schema::DataType::SparseFloatVector;
case DataType::VECTOR_INT8:
return proto::schema::DataType::Int8Vector;
case DataType::VECTOR_ARRAY:
return proto::schema::DataType::ArrayOfVector;
// Internal-only or unsupported mappings
case DataType::ROW:
default:
ThrowInfo(
DataTypeInvalid,
fmt::format(
"failed to convert to proto data type, invalid type {}",
data_type));
}
}
inline std::shared_ptr<arrow::DataType>
GetArrowDataType(DataType data_type, int dim = 1) {
switch (data_type) {

View File

@ -86,95 +86,6 @@ class PlanNode {
using PlanNodePtr = std::shared_ptr<PlanNode>;
class SegmentNode : public PlanNode {
public:
SegmentNode(
const PlanNodeId& id,
const std::shared_ptr<milvus::segcore::SegmentInternalInterface>&
segment)
: PlanNode(id), segment_(segment) {
}
DataType
output_type() const override {
return DataType::ROW;
}
std::vector<std::shared_ptr<PlanNode>>
sources() const override {
return {};
}
std::string_view
name() const override {
return "SegmentNode";
}
std::string
ToString() const override {
return "SegmentNode";
}
private:
std::shared_ptr<milvus::segcore::SegmentInternalInterface> segment_;
};
class ValuesNode : public PlanNode {
public:
ValuesNode(const PlanNodeId& id,
const std::vector<RowVectorPtr>& values,
bool parallelizeable = false)
: PlanNode(id),
values_{std::move(values)},
output_type_(values[0]->type()) {
AssertInfo(!values.empty(), "ValueNode must has value");
}
ValuesNode(const PlanNodeId& id,
std::vector<RowVectorPtr>&& values,
bool parallelizeable = false)
: PlanNode(id),
values_{std::move(values)},
output_type_(values[0]->type()) {
AssertInfo(!values.empty(), "ValueNode must has value");
}
DataType
output_type() const override {
return output_type_;
}
const std::vector<RowVectorPtr>&
values() const {
return values_;
}
std::vector<PlanNodePtr>
sources() const override {
return {};
}
bool
parallelizable() {
return parallelizable_;
}
std::string_view
name() const override {
return "Values";
}
std::string
ToString() const override {
return "Values";
}
private:
DataType output_type_;
const std::vector<RowVectorPtr> values_;
bool parallelizable_;
};
class FilterNode : public PlanNode {
public:
FilterNode(const PlanNodeId& id,

View File

@ -65,7 +65,6 @@ MergeExprWithNamespace(const SchemaPtr schema,
std::unique_ptr<VectorPlanNode>
ProtoParser::PlanNodeFromProto(const planpb::PlanNode& plan_node_proto) {
// TODO: add more buffs
Assert(plan_node_proto.has_vector_anns());
auto& anns_proto = plan_node_proto.vector_anns();
@ -326,9 +325,7 @@ ProtoParser::RetrievePlanNodeFromProto(
sources));
sources = std::vector<milvus::plan::PlanNodePtr>{plannode};
} else {
auto expr_parser =
parse_expr_to_filter_node(query.predicates());
plannode = std::move(expr_parser);
plannode = parse_expr_to_filter_node(query.predicates());
sources = std::vector<milvus::plan::PlanNodePtr>{plannode};
}
}
@ -362,8 +359,8 @@ ProtoParser::CreatePlan(const proto::plan::PlanNode& plan_node_proto) {
auto plan = std::make_unique<Plan>(schema);
auto plan_node = PlanNodeFromProto(plan_node_proto);
plan->tag2field_["$0"] = plan_node->search_info_.field_id_;
plan->plan_node_ = std::move(plan_node);
plan->tag2field_["$0"] = plan->plan_node_->search_info_.field_id_;
ExtractedPlanInfo extra_info(schema->size());
extra_info.add_involved_field(plan->plan_node_->search_info_.field_id_);
plan->extra_info_opt_ = std::move(extra_info);

View File

@ -119,6 +119,33 @@ install(TARGETS all_tests DESTINATION unittest)
add_subdirectory(bench)
add_subdirectory(test_json_stats)
# Optionally include external scalar-benchmark project
option(ENABLE_SCALAR_BENCH "Enable fetching and building scalar-benchmark" OFF)
set(SCALAR_BENCHMARK_GIT_URL "https://github.com/zilliztech/scalar-benchmark" CACHE STRING "Scalar benchmark git repo URL")
set(SCALAR_BENCHMARK_GIT_TAG "main" CACHE STRING "Scalar benchmark git tag/branch")
if (ENABLE_SCALAR_BENCH)
include(FetchContent)
if (DEFINED SCALAR_BENCHMARK_SOURCE_DIR AND EXISTS ${SCALAR_BENCHMARK_SOURCE_DIR}/CMakeLists.txt)
message(STATUS "Using local scalar-benchmark from ${SCALAR_BENCHMARK_SOURCE_DIR}")
add_subdirectory(${SCALAR_BENCHMARK_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR}/scalar-benchmark-build)
else()
message(STATUS "Fetching scalar-benchmark from ${SCALAR_BENCHMARK_GIT_URL} (${SCALAR_BENCHMARK_GIT_TAG})")
FetchContent_Declare(
scalar_benchmark
GIT_REPOSITORY ${SCALAR_BENCHMARK_GIT_URL}
GIT_TAG ${SCALAR_BENCHMARK_GIT_TAG}
SOURCE_DIR ${CMAKE_CURRENT_BINARY_DIR}/scalar-benchmark-src
BINARY_DIR ${CMAKE_CURRENT_BINARY_DIR}/scalar-benchmark-build
)
FetchContent_GetProperties(scalar_benchmark)
if (NOT scalar_benchmark_POPULATED)
FetchContent_Populate(scalar_benchmark)
add_subdirectory(${scalar_benchmark_SOURCE_DIR} ${scalar_benchmark_BINARY_DIR})
endif()
endif()
endif()
# bitset unit test
include(CheckCXXCompilerFlag)
include(CheckIncludeFileCXX)

View File

@ -37,4 +37,4 @@ target_link_libraries(indexbuilder_bench
pthread
)
target_link_libraries(indexbuilder_bench benchmark_main)
target_link_libraries(indexbuilder_bench benchmark_main)

View File

@ -1,27 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <cstdint>
#include <benchmark/benchmark.h>
#include <string>
#include "common/type_c.h"
#include "segcore/segment_c.h"
#include "segcore/SegmentGrowing.h"
#include "segcore/SegmentSealed.h"
#include "test_cachinglayer/cachinglayer_test_utils.h"
#include "test_utils/DataGen.h"
#include "test_utils/storage_test_utils.h"
using namespace milvus;
using namespace milvus::query;
using namespace milvus::segcore;
static int dim = 768;

View File

@ -158,8 +158,10 @@ PrepareSingleFieldInsertBinlog(int64_t collection_id,
for (auto i = 0; i < field_datas.size(); ++i) {
auto& field_data = field_datas[i];
row_count += field_data->Length();
auto file =
"./data/test" + std::to_string(field_id) + "/" + std::to_string(i);
auto file = "./data/test/" + std::to_string(collection_id) + "/" +
std::to_string(partition_id) + "/" +
std::to_string(segment_id) + "/" +
std::to_string(field_id) + "/" + std::to_string(i);
files.push_back(file);
row_counts.push_back(field_data->Length());
auto payload_reader =