diff --git a/Makefile b/Makefile index e379670146..61e4271260 100644 --- a/Makefile +++ b/Makefile @@ -386,15 +386,21 @@ run-test-cpp: @echo $(PWD)/scripts/run_cpp_unittest.sh arg=${filter} @(env bash $(PWD)/scripts/run_cpp_unittest.sh arg=${filter}) -# tool for benchmark -exprparser-tool: - @echo "Building exprparser helper ..." +plan-parser-so: + @echo "Building plan parser shared library ..." @source $(PWD)/scripts/setenv.sh && \ - mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && \ - GO111MODULE=on $(GO) build -pgo=$(PGO_PATH)/default.pgo -ldflags="-r $${RPATH}" -o $(INSTALL_PATH)/exprparser $(PWD)/cmd/tools/exprparser/main.go 1>/dev/null + mkdir -p $(PWD)/internal/core/output/lib $(PWD)/internal/core/output/include && \ + go env -w CGO_ENABLED="1" && \ + GO111MODULE=on $(GO) build -buildmode=c-shared -o $(PWD)/internal/core/output/lib/libmilvus-planparser.so $(PWD)/internal/parser/planparserv2/cwrapper/wrapper.go && \ + mv $(PWD)/internal/core/output/lib/libmilvus-planparser.h $(PWD)/internal/core/output/include/libmilvus-planparser.h && \ + cp $(PWD)/internal/parser/planparserv2/cwrapper/milvus_plan_parser.h $(PWD)/internal/core/output/include/ && \ + g++ -shared -fPIC -o $(PWD)/internal/core/output/lib/libmilvus-planparser-cpp.so $(PWD)/internal/parser/planparserv2/cwrapper/milvus_plan_parser.cpp \ + -I$(PWD)/internal/core/output/include \ + -L$(PWD)/internal/core/output/lib -lmilvus-planparser \ + -Wl,-rpath,'$$ORIGIN' # Build unittest with external scalar-benchmark enabled -scalar-bench: generated-proto exprparser-tool +scalar-bench: generated-proto plan-parser-so @echo "Building Milvus cpp unittest with scalar-benchmark ... " @(export CMAKE_EXTRA_ARGS="-DENABLE_SCALAR_BENCH=ON"; env bash $(PWD)/scripts/core_build.sh -t ${mode} -a ${use_asan} -u -n ${use_disk_index} -y ${use_dynamic_simd} ${AZURE_OPTION} -x ${index_engine} -o ${use_opendal} -f $(tantivy_features)) diff --git a/cmd/tools/exprparser/main.go b/cmd/tools/exprparser/main.go deleted file mode 100644 index b79c0af053..0000000000 --- a/cmd/tools/exprparser/main.go +++ /dev/null @@ -1,115 +0,0 @@ -package main - -import ( - "bufio" - "encoding/base64" - "encoding/json" - "fmt" - "io" - "os" - "strings" - - "google.golang.org/protobuf/proto" - - schemapb "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - "github.com/milvus-io/milvus/internal/parser/planparserv2" - _ "github.com/milvus-io/milvus/pkg/v2/proto/planpb" - "github.com/milvus-io/milvus/pkg/v2/util/typeutil" -) - -type parseRequest struct { - ID string `json:"id"` - Op string `json:"op"` - SchemaB64 string `json:"schema_b64"` - Expr string `json:"expr"` - Options struct { - IsCount bool `json:"is_count"` - Limit int64 `json:"limit"` - } `json:"options"` -} - -type parseResponse struct { - ID string `json:"id"` - OK bool `json:"ok"` - PlanB64 string `json:"plan_b64,omitempty"` - Error string `json:"error,omitempty"` -} - -func handle(line string) parseResponse { - line = strings.TrimSpace(line) - if line == "" { - return parseResponse{ID: "", OK: false, Error: "empty line"} - } - - var req parseRequest - if err := json.Unmarshal([]byte(line), &req); err != nil { - return parseResponse{ID: req.ID, OK: false, Error: fmt.Sprintf("invalid json: %v", err)} - } - if req.Op != "parse_expr" { - return parseResponse{ID: req.ID, OK: false, Error: "unsupported op"} - } - if req.SchemaB64 == "" { - return parseResponse{ID: req.ID, OK: false, Error: "missing schema_b64"} - } - if req.Expr == "" { - return parseResponse{ID: req.ID, OK: false, Error: "missing expr"} - } - - schemaBytes, err := base64.StdEncoding.DecodeString(req.SchemaB64) - if err != nil { - return parseResponse{ID: req.ID, OK: false, Error: fmt.Sprintf("decode schema_b64 failed: %v", err)} - } - var schema schemapb.CollectionSchema - if err := proto.Unmarshal(schemaBytes, &schema); err != nil { - return parseResponse{ID: req.ID, OK: false, Error: fmt.Sprintf("unmarshal schema failed: %v", err)} - } - - helper, err := typeutil.CreateSchemaHelper(&schema) - if err != nil { - return parseResponse{ID: req.ID, OK: false, Error: fmt.Sprintf("schema helper error: %v", err)} - } - - planNode, err := planparserv2.CreateRetrievePlan(helper, req.Expr, nil) - if err != nil { - return parseResponse{ID: req.ID, OK: false, Error: fmt.Sprintf("parse error: %v", err)} - } - - // Apply options if provided - if q := planNode.GetQuery(); q != nil { - q.IsCount = req.Options.IsCount - if req.Options.Limit > 0 { - q.Limit = req.Options.Limit - } - } - - planBytes, err := proto.Marshal(planNode) - if err != nil { - return parseResponse{ID: req.ID, OK: false, Error: fmt.Sprintf("marshal plan failed: %v", err)} - } - return parseResponse{ID: req.ID, OK: true, PlanB64: base64.StdEncoding.EncodeToString(planBytes)} -} - -func writeResp(w *bufio.Writer, resp parseResponse) { - b, _ := json.Marshal(resp) - _, _ = w.Write(b) - _ = w.WriteByte('\n') - _ = w.Flush() -} - -func main() { - in := bufio.NewScanner(os.Stdin) - buf := make([]byte, 0, 1024*1024) - in.Buffer(buf, 16*1024*1024) - w := bufio.NewWriter(os.Stdout) - - for { - if !in.Scan() { - if err := in.Err(); err != nil && err != io.EOF { - writeResp(w, parseResponse{ID: "", OK: false, Error: fmt.Sprintf("scan error: %v", err)}) - } - break - } - resp := handle(in.Text()) - writeResp(w, resp) - } -} diff --git a/internal/parser/planparserv2/cwrapper/milvus_plan_parser.cpp b/internal/parser/planparserv2/cwrapper/milvus_plan_parser.cpp new file mode 100644 index 0000000000..23a813de53 --- /dev/null +++ b/internal/parser/planparserv2/cwrapper/milvus_plan_parser.cpp @@ -0,0 +1,86 @@ +// Copyright (C) 2019-2025 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +//go:build exclude + +#include "milvus_plan_parser.h" + +#include +#include + +// This header is generated by the Go build (cgo) +// and is expected to be available in the include path. +extern "C" { + #include "libmilvus-planparser.h" +} + +namespace milvus { +namespace planparserv2 { + +SchemaHandle PlanParser::RegisterSchema(const std::vector& schema_proto) { + void* proto_blob = const_cast(static_cast(schema_proto.data())); + int len = static_cast(schema_proto.size()); + char* err_msg = nullptr; + + SchemaHandle handle = ::RegisterSchema(proto_blob, len, &err_msg); + if (handle == kInvalidSchemaHandle) { + std::string err_str = "Unknown error"; + if (err_msg != nullptr) { + err_str = std::string(err_msg); + ::Free(err_msg); + } + throw std::runtime_error("Failed to register schema: " + err_str); + } + + return handle; +} + +std::string PlanParser::UnregisterSchema(SchemaHandle handle) { + char* err_msg = nullptr; + + int result = ::UnregisterSchema(handle, &err_msg); + if (result != 0) { + std::string err = err_msg ? std::string(err_msg) : "unknown error"; + if (err_msg != nullptr) { + ::Free(err_msg); + } + return err; + } + return ""; +} + +std::vector PlanParser::Parse(SchemaHandle handle, const std::string& expr) { + char* c_expr = const_cast(expr.c_str()); + char* err_msg = nullptr; + int length = 0; + + void* result = ::Parse(handle, c_expr, &length, &err_msg); + if (result == nullptr) { + std::string err_str = "Unknown error"; + if (err_msg != nullptr) { + err_str = std::string(err_msg); + ::Free(err_msg); + } + throw std::runtime_error("Failed to parse expression: " + err_str); + } + + std::vector plan(length); + if (length > 0) { + std::memcpy(plan.data(), result, length); + } + + ::Free(result); + + return plan; +} + +} // namespace planparserv2 +} // namespace milvus diff --git a/internal/parser/planparserv2/cwrapper/milvus_plan_parser.h b/internal/parser/planparserv2/cwrapper/milvus_plan_parser.h new file mode 100644 index 0000000000..16204efbbb --- /dev/null +++ b/internal/parser/planparserv2/cwrapper/milvus_plan_parser.h @@ -0,0 +1,80 @@ +// Copyright (C) 2019-2025 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once + +#include +#include +#include + +namespace milvus { +namespace planparserv2 { + +// SchemaHandle is an opaque handle to a registered schema. +// Valid handles are > 0. A handle of 0 indicates an invalid/unregistered schema. +using SchemaHandle = int64_t; + +constexpr SchemaHandle kInvalidSchemaHandle = 0; + +// Thread-safe wrapper for the Go plan parser. +// +// Thread safety guarantees: +// - RegisterSchema: Can be called concurrently. Each call returns a unique handle. +// - UnregisterSchema: Can be called concurrently. Returns error if schema is in use or already unregistered. +// - Parse: Can be called concurrently. Uses lock-free reference counting internally. +// +// Usage: +// auto handle = PlanParser::RegisterSchema(schema_proto); +// auto plan = PlanParser::Parse(handle, "field > 10"); +// PlanParser::UnregisterSchema(handle); +class PlanParser { +public: + /** + * @brief Register a schema to the plan parser. + * + * Thread-safe. Each call returns a unique handle, even for identical schemas. + * The same schema can be registered multiple times, each with a different handle. + * + * @param schema_proto The serialized CollectionSchema protobuf. + * @return SchemaHandle A unique handle for the registered schema (> 0). + * @throws std::runtime_error if registration fails (e.g., invalid protobuf). + */ + static SchemaHandle RegisterSchema(const std::vector& schema_proto); + + /** + * @brief Unregister a schema from the plan parser. + * + * Thread-safe. Fails if the schema is currently being used by Parse() or already unregistered. + * + * @param handle The handle returned by RegisterSchema. + * @return Empty string on success, error message on failure. + */ + static std::string UnregisterSchema(SchemaHandle handle); + + /** + * @brief Parse an expression string into a serialized PlanNode protobuf. + * + * Thread-safe and lock-free. Multiple threads can call Parse() concurrently + * with the same or different handles. + * + * @param handle The handle returned by RegisterSchema. + * @param expr The expression string to parse. + * @return std::vector The serialized PlanNode protobuf. + * @throws std::runtime_error if: + * - handle is invalid or not found + * - schema was unregistered + * - parsing fails + */ + static std::vector Parse(SchemaHandle handle, const std::string& expr); +}; + +} // namespace planparserv2 +} // namespace milvus diff --git a/internal/parser/planparserv2/cwrapper/wrapper.go b/internal/parser/planparserv2/cwrapper/wrapper.go new file mode 100644 index 0000000000..5249ddead3 --- /dev/null +++ b/internal/parser/planparserv2/cwrapper/wrapper.go @@ -0,0 +1,149 @@ +package main + +/* +#include +*/ +import "C" + +import ( + "sync" + "sync/atomic" + "unsafe" + + "google.golang.org/protobuf/proto" + + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/parser/planparserv2" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" +) + +type schemaEntry struct { + helper *typeutil.SchemaHelper + refCount int64 // >= 0: available, < 0: marked as deleted +} + +var ( + schemaMap sync.Map // map[int64]*schemaEntry + nextID int64 // atomic increment, starts from 0, first ID is 1 +) + +// acquireRef tries to acquire a reference to the schema entry. +// Returns true if successful, false if the schema is deleted or being deleted. +func (e *schemaEntry) acquireRef() bool { + for { + old := atomic.LoadInt64(&e.refCount) + if old < 0 { + return false + } + if atomic.CompareAndSwapInt64(&e.refCount, old, old+1) { + return true + } + } +} + +// releaseRef releases a reference to the schema entry. +func (e *schemaEntry) releaseRef() { + atomic.AddInt64(&e.refCount, -1) +} + +// tryMarkDeleted tries to mark the schema entry as deleted. +// Returns 0 if successful, 1 if in use, 2 if already deleted. +func (e *schemaEntry) tryMarkDeleted() int { + if atomic.CompareAndSwapInt64(&e.refCount, 0, -1) { + return 0 // success + } + current := atomic.LoadInt64(&e.refCount) + if current < 0 { + return 2 // already deleted + } + return 1 // in use +} + +//export RegisterSchema +func RegisterSchema(protoBlob unsafe.Pointer, length C.int, errMsg **C.char) C.longlong { + blob := C.GoBytes(protoBlob, length) + schema := &schemapb.CollectionSchema{} + if err := proto.Unmarshal(blob, schema); err != nil { + *errMsg = C.CString("failed to unmarshal schema: " + err.Error()) + return 0 + } + + helper, err := typeutil.CreateSchemaHelper(schema) + if err != nil { + *errMsg = C.CString("failed to create schema helper: " + err.Error()) + return 0 + } + + id := atomic.AddInt64(&nextID, 1) + entry := &schemaEntry{helper: helper, refCount: 0} + schemaMap.Store(id, entry) + + return C.longlong(id) +} + +//export UnregisterSchema +func UnregisterSchema(schemaID C.longlong, errMsg **C.char) C.int { + id := int64(schemaID) + + val, ok := schemaMap.Load(id) + if !ok { + *errMsg = C.CString("schema not found") + return -1 + } + entry := val.(*schemaEntry) + + switch entry.tryMarkDeleted() { + case 0: // success + schemaMap.Delete(id) + return 0 + case 1: // in use + *errMsg = C.CString("schema is in use") + return -1 + case 2: // already deleted + *errMsg = C.CString("schema already unregistered") + return -1 + } + + return -1 +} + +//export Parse +func Parse(schemaID C.longlong, exprStr *C.char, length *C.int, errMsg **C.char) unsafe.Pointer { + id := int64(schemaID) + goExprStr := C.GoString(exprStr) + + val, ok := schemaMap.Load(id) + if !ok { + *errMsg = C.CString("schema not found") + return nil + } + entry := val.(*schemaEntry) + + if !entry.acquireRef() { + *errMsg = C.CString("schema has been unregistered") + return nil + } + defer entry.releaseRef() + + planNode, err := planparserv2.CreateRetrievePlan(entry.helper, goExprStr, nil) + if err != nil { + *errMsg = C.CString(err.Error()) + return nil + } + + bytes, err := proto.Marshal(planNode) + if err != nil { + *errMsg = C.CString("failed to marshal plan node: " + err.Error()) + return nil + } + + *length = C.int(len(bytes)) + return C.CBytes(bytes) +} + +//export Free +func Free(ptr unsafe.Pointer) { + C.free(ptr) +} + +func main() {}