feat: Supports tracing services that require header-based authentication. (#43211)

issue: https://github.com/milvus-io/milvus/issues/43082

support tracing services that require header-based authentication.
for example: aliyun SLS, volcengine LogService etc...

[aliyun
SLS](https://help.aliyun.com/zh/sls/import-trace-data-from-golang-applications-to-log-service-by-using-opentelemetry-sdk-for-golang?spm=a2c4g.11186623.help-menu-search-28958.d_1#section-ktk-xxz-8om)

Add a headers config in trace config

```
trace:
  exporter: otlp
  sampleFraction: 1
  otlp:
    endpoint:  milvus-cn-beijing-pre.cn-beijing.log.aliyuncs.com:10010
    method:  # otlp export method, acceptable values: ["grpc", "http"],  using "grpc" by default
    secure: true
    headers:  # base64
  initTimeoutSeconds: 10
```

it is encoded as base64, raw data is json
```
{
    "x-sls-otel-project": "milvus-cn-beijing-pre",
    "x-sls-otel-instance-id": "milvus-cn-beijing-pre",
    "x-sls-otel-ak-id": "xxx",
    "x-sls-otel-ak-secret": "xxx"
}
```

[volcengine
tls](https://www.volcengine.com/docs/6470/812322#grpc-%E5%8D%8F%E8%AE%AE%E5%88%9D%E5%A7%8B%E5%8C%96%E7%A4%BA%E4%BE%8B)

Add a headers config in trace config

```
trace:
  exporter: otlp
  sampleFraction: 1
  otlp:
    endpoint:  xxx
    method:  # otlp export method, acceptable values: ["grpc", "http"],  using "grpc" by default
    secure: true
    headers:  # base64
  initTimeoutSeconds: 10
```

it is encoded as base64, raw data is json
```
{
    "x-tls-otel-region": "cn-beijing",
    "x-tls-otel-tracetopic": "milvus-cn-beijing-pre",
    "x-tls-otel-ak": "xxx",
    "x-tls-otel-sk": "xxx"
}
```

Signed-off-by: PjJinchen <6268414+pj1987111@users.noreply.github.com>
This commit is contained in:
PjJinchen 2025-07-10 17:32:48 +08:00 committed by GitHub
parent 85c8049296
commit a90694165b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 279 additions and 2 deletions

View File

@ -1197,6 +1197,7 @@ trace:
endpoint: # example: "127.0.0.1:4317" for grpc, "127.0.0.1:4318" for http
method: # otlp export method, acceptable values: ["grpc", "http"], using "grpc" by default
secure: true
headers: # otlp header that encoded in base64
initTimeoutSeconds: 10 # segcore initialization timeout in seconds, preventing otlp grpc hangs forever
#when using GPU indexing, Milvus will utilize a memory pool to avoid frequent memory allocation and deallocation.

View File

@ -13,12 +13,15 @@
#include <opentelemetry/exporters/otlp/otlp_http_exporter_factory.h>
#include <opentelemetry/exporters/otlp/otlp_http_exporter_options.h>
#include "log/Log.h"
#include "nlohmann/json.hpp"
#include <atomic>
#include <cstddef>
#include <iomanip>
#include <iostream>
#include <utility>
#include <sstream>
#include <algorithm>
#include <cctype>
#include "opentelemetry/exporters/jaeger/jaeger_exporter_factory.h"
#include "opentelemetry/exporters/ostream/span_exporter_factory.h"
@ -64,12 +67,24 @@ initTelemetry(const TraceConfig& cfg) {
if (cfg.otlpMethod == "http") {
auto opts = otlp::OtlpHttpExporterOptions{};
opts.url = cfg.otlpEndpoint;
auto headers_map = parseHeaders(cfg.otlpHeaders);
if (!headers_map.empty()) {
for (const auto& pair : headers_map) {
opts.http_headers.insert(std::pair<std::string, std::string>(pair.first, pair.second));
}
}
exporter = otlp::OtlpHttpExporterFactory::Create(opts);
LOG_INFO("init otlp http exporter, endpoint: {}", opts.url);
} else if (cfg.otlpMethod == "grpc" ||
cfg.otlpMethod == "") { // legacy configuration
auto opts = otlp::OtlpGrpcExporterOptions{};
opts.endpoint = cfg.otlpEndpoint;
auto headers_map = parseHeaders(cfg.otlpHeaders);
if (!headers_map.empty()) {
for (const auto& pair : headers_map) {
opts.metadata.insert(std::pair<std::string, std::string>(pair.first, pair.second));
}
}
opts.use_ssl_credentials = cfg.oltpSecure;
exporter = otlp::OtlpGrpcExporterFactory::Create(opts);
LOG_INFO("init otlp grpc exporter, endpoint: {}", opts.endpoint);
@ -277,4 +292,20 @@ AutoSpan::~AutoSpan() {
}
}
std::map<std::string, std::string>
parseHeaders(const std::string& headers) {
if (headers.empty()) {
return {};
}
try {
nlohmann::json json = nlohmann::json::parse(headers);
return json.get<std::map<std::string, std::string>>();
} catch (const std::exception& e) {
// Log the parsing error and return empty map
LOG_ERROR("Failed to parse headers as JSON: {}, error: {}", headers, e.what());
return {};
}
}
} // namespace milvus::tracer

View File

@ -13,6 +13,7 @@
#include <memory>
#include <string>
#include <map>
#include "opentelemetry/trace/provider.h"
@ -26,6 +27,7 @@ struct TraceConfig {
std::string jaegerURL;
std::string otlpEndpoint;
std::string otlpMethod;
std::string otlpHeaders;
bool oltpSecure;
int nodeID;
@ -83,6 +85,9 @@ GetSpanIDAsHexStr(const TraceContext* ctx);
std::string
GetTraceID();
std::map<std::string, std::string>
parseHeaders(const std::string& headers);
struct AutoSpan {
explicit AutoSpan(const std::string& name,
TraceContext* ctx = nullptr,

View File

@ -110,6 +110,7 @@ InitTrace(CTraceConfig* config) {
config->jaegerURL,
config->otlpEndpoint,
config->otlpMethod,
config->otlpHeaders,
config->oltpSecure,
config->nodeID};
std::call_once(
@ -127,6 +128,7 @@ SetTrace(CTraceConfig* config) {
config->jaegerURL,
config->otlpEndpoint,
config->otlpMethod,
config->otlpHeaders,
config->oltpSecure,
config->nodeID};
milvus::tracer::initTelemetry(traceConfig);

View File

@ -122,6 +122,7 @@ typedef struct CTraceConfig {
const char* jaegerURL;
const char* otlpEndpoint;
const char* otlpMethod;
const char* otlpHeaders;
bool oltpSecure;
int nodeID;

View File

@ -1,4 +1,3 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
@ -134,3 +133,176 @@ TEST(Tracer, GetTraceID) {
trace_id = GetTraceID();
ASSERT_TRUE(trace_id.empty());
}
TEST(Tracer, ParseHeaders) {
// Test empty headers
auto headers_map = parseHeaders("");
ASSERT_TRUE(headers_map.empty());
// Test simple JSON headers
std::string json_headers = R"({"Authorization": "Bearer token123", "Content-Type": "application/json"})";
headers_map = parseHeaders(json_headers);
ASSERT_EQ(headers_map.size(), 2);
ASSERT_EQ(headers_map["Authorization"], "Bearer token123");
ASSERT_EQ(headers_map["Content-Type"], "application/json");
// Test JSON with whitespace
std::string json_headers_with_spaces = R"({ "key1" : "value1" , "key2" : "value2" })";
headers_map = parseHeaders(json_headers_with_spaces);
ASSERT_EQ(headers_map.size(), 2);
ASSERT_EQ(headers_map["key1"], "value1");
ASSERT_EQ(headers_map["key2"], "value2");
// Test invalid JSON
std::string invalid_json = "invalid json string";
headers_map = parseHeaders(invalid_json);
ASSERT_TRUE(headers_map.empty());
// Test empty JSON object
std::string empty_json = "{}";
headers_map = parseHeaders(empty_json);
ASSERT_TRUE(headers_map.empty());
}
TEST(Tracer, OTLPHttpExporter) {
auto config = std::make_shared<TraceConfig>();
config->exporter = "otlp";
config->otlpMethod = "http";
config->otlpEndpoint = "http://localhost:4318/v1/traces";
config->otlpHeaders = R"({"Authorization": "Bearer test-token", "Content-Type": "application/json"})";
config->nodeID = 1;
initTelemetry(*config);
auto span = StartSpan("test_otlp_http");
ASSERT_TRUE(span->IsRecording());
// Test with empty headers
config->otlpHeaders = "";
initTelemetry(*config);
span = StartSpan("test_otlp_http_empty_headers");
ASSERT_TRUE(span->IsRecording());
// Test with invalid JSON headers
config->otlpHeaders = "invalid json";
initTelemetry(*config);
span = StartSpan("test_otlp_http_invalid_headers");
ASSERT_TRUE(span->IsRecording());
}
TEST(Tracer, OTLPGrpcExporter) {
auto config = std::make_shared<TraceConfig>();
config->exporter = "otlp";
config->otlpMethod = "grpc";
config->otlpEndpoint = "localhost:4317";
config->otlpHeaders = R"({"Authorization": "Bearer grpc-token"})";
config->oltpSecure = false;
config->nodeID = 1;
initTelemetry(*config);
auto span = StartSpan("test_otlp_grpc");
ASSERT_TRUE(span->IsRecording());
// Test with secure connection
config->oltpSecure = true;
initTelemetry(*config);
span = StartSpan("test_otlp_grpc_secure");
ASSERT_TRUE(span->IsRecording());
// Test with empty headers
config->otlpHeaders = "";
config->oltpSecure = false;
initTelemetry(*config);
span = StartSpan("test_otlp_grpc_empty_headers");
ASSERT_TRUE(span->IsRecording());
}
TEST(Tracer, OTLPLegacyConfiguration) {
auto config = std::make_shared<TraceConfig>();
config->exporter = "otlp";
config->otlpMethod = ""; // legacy configuration
config->otlpEndpoint = "localhost:4317";
config->otlpHeaders = R"({"legacy": "header"})";
config->oltpSecure = false;
config->nodeID = 1;
initTelemetry(*config);
auto span = StartSpan("test_otlp_legacy");
ASSERT_TRUE(span->IsRecording());
}
TEST(Tracer, OTLPInvalidMethod) {
auto config = std::make_shared<TraceConfig>();
config->exporter = "otlp";
config->otlpMethod = "invalid_method";
config->otlpEndpoint = "localhost:4317";
config->nodeID = 1;
initTelemetry(*config);
auto span = StartSpan("test_otlp_invalid");
// Should fall back to noop provider when export creation fails
ASSERT_FALSE(span->IsRecording());
}
TEST(Tracer, OTLPComplexHeaders) {
auto config = std::make_shared<TraceConfig>();
config->exporter = "otlp";
config->otlpMethod = "http";
config->otlpEndpoint = "http://localhost:4318/v1/traces";
config->otlpHeaders = R"({
"Authorization": "Bearer complex-token-123",
"X-Custom-Header": "custom-value",
"User-Agent": "Milvus-Tracer/1.0",
"Accept": "application/json"
})";
config->nodeID = 1;
initTelemetry(*config);
auto span = StartSpan("test_otlp_complex_headers");
ASSERT_TRUE(span->IsRecording());
}
TEST(Tracer, OTLPEmptyExporter) {
auto config = std::make_shared<TraceConfig>();
config->exporter = ""; // empty exporter
config->nodeID = 1;
initTelemetry(*config);
auto span = StartSpan("test_empty_exporter");
// Should fall back to noop provider
ASSERT_FALSE(span->IsRecording());
}
TEST(Tracer, OTLPInvalidExporter) {
auto config = std::make_shared<TraceConfig>();
config->exporter = "invalid_exporter";
config->nodeID = 1;
initTelemetry(*config);
auto span = StartSpan("test_invalid_exporter");
// Should fall back to noop provider
ASSERT_FALSE(span->IsRecording());
}
TEST(Tracer, OTLPHeadersParsingEdgeCases) {
// Test with whitespace in JSON
std::string json_with_spaces = R"({ "key1" : "value1" , "key2" : "value2" })";
auto headers_map = parseHeaders(json_with_spaces);
ASSERT_EQ(headers_map.size(), 2);
ASSERT_EQ(headers_map["key1"], "value1");
ASSERT_EQ(headers_map["key2"], "value2");
// Test with nested JSON (should fail gracefully)
std::string nested_json = R"({"key": {"nested": "value"}})";
headers_map = parseHeaders(nested_json);
ASSERT_TRUE(headers_map.empty());
// Test with array JSON (should fail gracefully)
std::string array_json = R"(["header1", "header2"])";
headers_map = parseHeaders(array_json);
ASSERT_TRUE(headers_map.empty());
// Test with null JSON
std::string null_json = "null";
headers_map = parseHeaders(null_json);
ASSERT_TRUE(headers_map.empty());
}

View File

@ -29,6 +29,7 @@ package initcore
import "C"
import (
"encoding/base64"
"fmt"
"path"
"time"
@ -55,10 +56,12 @@ func InitTraceConfig(params *paramtable.ComponentParam) {
otlpMethod := C.CString(params.TraceCfg.OtlpMethod.GetValue())
endpoint := C.CString(params.TraceCfg.OtlpEndpoint.GetValue())
otlpSecure := params.TraceCfg.OtlpSecure.GetAsBool()
otlpHeaders := C.CString(serializeHeaders(params.TraceCfg.OtlpHeaders.GetValue()))
defer C.free(unsafe.Pointer(exporter))
defer C.free(unsafe.Pointer(jaegerURL))
defer C.free(unsafe.Pointer(endpoint))
defer C.free(unsafe.Pointer(otlpMethod))
defer C.free(unsafe.Pointer(otlpHeaders))
config := C.CTraceConfig{
exporter: exporter,
@ -66,6 +69,7 @@ func InitTraceConfig(params *paramtable.ComponentParam) {
jaegerURL: jaegerURL,
otlpEndpoint: endpoint,
otlpMethod: otlpMethod,
otlpHeaders: otlpHeaders,
oltpSecure: (C.bool)(otlpSecure),
nodeID: nodeID,
}
@ -86,10 +90,12 @@ func ResetTraceConfig(params *paramtable.ComponentParam) {
endpoint := C.CString(params.TraceCfg.OtlpEndpoint.GetValue())
otlpMethod := C.CString(params.TraceCfg.OtlpMethod.GetValue())
otlpSecure := params.TraceCfg.OtlpSecure.GetAsBool()
otlpHeaders := C.CString(serializeHeaders(params.TraceCfg.OtlpHeaders.GetValue()))
defer C.free(unsafe.Pointer(exporter))
defer C.free(unsafe.Pointer(jaegerURL))
defer C.free(unsafe.Pointer(endpoint))
defer C.free(unsafe.Pointer(otlpMethod))
defer C.free(unsafe.Pointer(otlpHeaders))
config := C.CTraceConfig{
exporter: exporter,
@ -97,6 +103,7 @@ func ResetTraceConfig(params *paramtable.ComponentParam) {
jaegerURL: jaegerURL,
otlpEndpoint: endpoint,
otlpMethod: otlpMethod,
otlpHeaders: otlpHeaders,
oltpSecure: (C.bool)(otlpSecure),
nodeID: nodeID,
}
@ -333,3 +340,14 @@ func HandleCStatus(status *C.CStatus, extraInfo string) error {
log.Warn(logMsg)
return errors.New(finalMsg)
}
func serializeHeaders(headerstr string) string {
if len(headerstr) == 0 {
return ""
}
decodeheaders, err := base64.StdEncoding.DecodeString(headerstr)
if err != nil {
return headerstr
}
return string(decodeheaders)
}

View File

@ -18,6 +18,8 @@ package tracer
import (
"context"
"encoding/base64"
"encoding/json"
"github.com/cockroachdb/errors"
"go.opentelemetry.io/otel"
@ -77,6 +79,34 @@ func SetTracerProvider(exp sdk.SpanExporter, traceIDRatio float64) {
otel.SetTracerProvider(tp)
}
// parseHeaders parses base64-encoded JSON headers string into map[string]string
func parseHeaders(headers string) map[string]string {
if headers == "" {
return nil
}
// Try to decode as base64 first
decodeheaders, err := base64.StdEncoding.DecodeString(headers)
if err != nil {
log.Warn("Failed to decode base64 headers, trying to parse as JSON directly", zap.Error(err))
// Try to parse headers as JSON directly
var headersMap map[string]string
if jsonErr := json.Unmarshal([]byte(headers), &headersMap); jsonErr == nil {
return headersMap
}
log.Warn("Failed to parse headers as JSON", zap.Error(err))
return nil
}
// Parse decoded JSON into map[string]string
var headersMap map[string]string
if jsonErr := json.Unmarshal(decodeheaders, &headersMap); jsonErr == nil {
return headersMap
}
log.Warn("Failed to parse decoded headers as JSON", zap.Error(err))
return nil
}
func CreateTracerExporter(params *paramtable.ComponentParam) (sdk.SpanExporter, error) {
var exp sdk.SpanExporter
var err error
@ -87,6 +117,7 @@ func CreateTracerExporter(params *paramtable.ComponentParam) (sdk.SpanExporter,
jaeger.WithEndpoint(params.TraceCfg.JaegerURL.GetValue())))
case "otlp":
secure := params.TraceCfg.OtlpSecure.GetAsBool()
headers := params.TraceCfg.OtlpHeaders.GetValue()
switch params.TraceCfg.OtlpMethod.GetValue() {
case "", "grpc":
opts := []otlptracegrpc.Option{
@ -95,6 +126,9 @@ func CreateTracerExporter(params *paramtable.ComponentParam) (sdk.SpanExporter,
if !secure {
opts = append(opts, otlptracegrpc.WithInsecure())
}
if headersMap := parseHeaders(headers); headersMap != nil {
opts = append(opts, otlptracegrpc.WithHeaders(headersMap))
}
exp, err = otlptracegrpc.New(context.Background(), opts...)
case "http":
opts := []otlptracehttp.Option{
@ -103,6 +137,9 @@ func CreateTracerExporter(params *paramtable.ComponentParam) (sdk.SpanExporter,
if !secure {
opts = append(opts, otlptracehttp.WithInsecure())
}
if headersMap := parseHeaders(headers); headersMap != nil {
opts = append(opts, otlptracehttp.WithHeaders(headersMap))
}
exp, err = otlptracehttp.New(context.Background(), opts...)
default:
return nil, errors.Newf("otlp method not supported: %s", params.TraceCfg.OtlpMethod.GetValue())

View File

@ -1144,6 +1144,7 @@ type traceConfig struct {
OtlpEndpoint ParamItem `refreshable:"false"`
OtlpMethod ParamItem `refreshable:"false"`
OtlpSecure ParamItem `refreshable:"false"`
OtlpHeaders ParamItem `refreshable:"false"`
InitTimeoutSeconds ParamItem `refreshable:"false"`
}
@ -1202,6 +1203,15 @@ Fractions >= 1 will always sample. Fractions < 0 are treated as zero.`,
}
t.OtlpSecure.Init(base.mgr)
t.OtlpHeaders = ParamItem{
Key: "trace.otlp.headers",
Version: "2.4.0",
DefaultValue: "",
Doc: "otlp header that encoded in base64",
Export: true,
}
t.OtlpHeaders.Init(base.mgr)
t.InitTimeoutSeconds = ParamItem{
Key: "trace.initTimeoutSeconds",
Version: "2.4.4",