Add tracing for retrieve (#23033)

Signed-off-by: Enwei Jiao <enwei.jiao@zilliz.com>
This commit is contained in:
Enwei Jiao 2023-03-29 18:10:02 +08:00 committed by GitHub
parent 970dcb3161
commit 50dab2d394
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 71 additions and 44 deletions

View File

@ -104,12 +104,18 @@ DeleteRetrieveResult(CRetrieveResult* retrieve_result) {
CStatus
Retrieve(CSegmentInterface c_segment,
CRetrievePlan c_plan,
CTraceContext c_trace,
uint64_t timestamp,
CRetrieveResult* result) {
try {
auto segment =
static_cast<const milvus::segcore::SegmentInterface*>(c_segment);
auto plan = static_cast<const milvus::query::RetrievePlan*>(c_plan);
auto ctx = milvus::tracer::TraceContext{
c_trace.traceID, c_trace.spanID, c_trace.flag};
auto span = milvus::tracer::StartSpan("SegcoreRetrieve", &ctx);
auto retrieve_result = segment->Retrieve(plan, timestamp);
auto size = retrieve_result->ByteSizeLong();
@ -118,6 +124,8 @@ Retrieve(CSegmentInterface c_segment,
result->proto_blob = buffer;
result->proto_size = size;
span->End();
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(UnexpectedError, e.what());

View File

@ -51,6 +51,7 @@ DeleteRetrieveResult(CRetrieveResult* retrieve_result);
CStatus
Retrieve(CSegmentInterface c_segment,
CRetrievePlan c_plan,
CTraceContext c_trace,
uint64_t timestamp,
CRetrieveResult* result);

View File

@ -384,7 +384,7 @@ TEST(CApiTest, MultiDeleteGrowingSegment) {
CRetrieveResult retrieve_result;
res = Retrieve(
segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result);
segment, plan.get(), {}, dataset.timestamps_[N - 1], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
auto query_result = std::make_unique<proto::segcore::RetrieveResults>();
auto suc = query_result->ParseFromArray(retrieve_result.proto_blob,
@ -399,7 +399,7 @@ TEST(CApiTest, MultiDeleteGrowingSegment) {
FieldId(101), DataType::INT64, retrive_pks);
plan->plan_node_->predicate_ = std::move(term_expr);
res = Retrieve(
segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result);
segment, plan.get(), {}, dataset.timestamps_[N - 1], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
suc = query_result->ParseFromArray(retrieve_result.proto_blob,
retrieve_result.proto_size);
@ -424,7 +424,7 @@ TEST(CApiTest, MultiDeleteGrowingSegment) {
// retrieve pks in {2}
res = Retrieve(
segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result);
segment, plan.get(), {}, dataset.timestamps_[N - 1], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
suc = query_result->ParseFromArray(retrieve_result.proto_blob,
retrieve_result.proto_size);
@ -515,7 +515,7 @@ TEST(CApiTest, MultiDeleteSealedSegment) {
CRetrieveResult retrieve_result;
res = Retrieve(
segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result);
segment, plan.get(), {}, dataset.timestamps_[N - 1], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
auto query_result = std::make_unique<proto::segcore::RetrieveResults>();
auto suc = query_result->ParseFromArray(retrieve_result.proto_blob,
@ -530,7 +530,7 @@ TEST(CApiTest, MultiDeleteSealedSegment) {
FieldId(101), DataType::INT64, retrive_pks);
plan->plan_node_->predicate_ = std::move(term_expr);
res = Retrieve(
segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result);
segment, plan.get(), {}, dataset.timestamps_[N - 1], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
suc = query_result->ParseFromArray(retrieve_result.proto_blob,
retrieve_result.proto_size);
@ -555,7 +555,7 @@ TEST(CApiTest, MultiDeleteSealedSegment) {
// retrieve pks in {2}
res = Retrieve(
segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result);
segment, plan.get(), {}, dataset.timestamps_[N - 1], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
suc = query_result->ParseFromArray(retrieve_result.proto_blob,
retrieve_result.proto_size);
@ -615,7 +615,7 @@ TEST(CApiTest, DeleteRepeatedPksFromGrowingSegment) {
CRetrieveResult retrieve_result;
res = Retrieve(
segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result);
segment, plan.get(), {}, dataset.timestamps_[N - 1], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
auto query_result = std::make_unique<proto::segcore::RetrieveResults>();
auto suc = query_result->ParseFromArray(retrieve_result.proto_blob,
@ -643,7 +643,7 @@ TEST(CApiTest, DeleteRepeatedPksFromGrowingSegment) {
// retrieve pks in {1, 2, 3}
res = Retrieve(
segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result);
segment, plan.get(), {}, dataset.timestamps_[N - 1], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
query_result = std::make_unique<proto::segcore::RetrieveResults>();
@ -717,7 +717,7 @@ TEST(CApiTest, DeleteRepeatedPksFromSealedSegment) {
CRetrieveResult retrieve_result;
res = Retrieve(
segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result);
segment, plan.get(), {}, dataset.timestamps_[N - 1], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
auto query_result = std::make_unique<proto::segcore::RetrieveResults>();
auto suc = query_result->ParseFromArray(retrieve_result.proto_blob,
@ -746,7 +746,7 @@ TEST(CApiTest, DeleteRepeatedPksFromSealedSegment) {
// retrieve pks in {1, 2, 3}
res = Retrieve(
segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result);
segment, plan.get(), {}, dataset.timestamps_[N - 1], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
query_result = std::make_unique<proto::segcore::RetrieveResults>();
@ -815,7 +815,7 @@ TEST(CApiTest, InsertSamePkAfterDeleteOnGrowingSegment) {
CRetrieveResult retrieve_result;
res = Retrieve(
segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result);
segment, plan.get(), {}, dataset.timestamps_[N - 1], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
auto query_result = std::make_unique<proto::segcore::RetrieveResults>();
auto suc = query_result->ParseFromArray(retrieve_result.proto_blob,
@ -840,7 +840,7 @@ TEST(CApiTest, InsertSamePkAfterDeleteOnGrowingSegment) {
// retrieve pks in {1, 2, 3}, timestamp = 19
res = Retrieve(
segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result);
segment, plan.get(), {}, dataset.timestamps_[N - 1], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
query_result = std::make_unique<proto::segcore::RetrieveResults>();
@ -933,7 +933,7 @@ TEST(CApiTest, InsertSamePkAfterDeleteOnSealedSegment) {
CRetrieveResult retrieve_result;
res = Retrieve(
segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result);
segment, plan.get(), {}, dataset.timestamps_[N - 1], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
auto query_result = std::make_unique<proto::segcore::RetrieveResults>();
auto suc = query_result->ParseFromArray(retrieve_result.proto_blob,
@ -1119,8 +1119,8 @@ TEST(CApiTest, RetrieveTestWithExpr) {
plan->field_ids_ = target_field_ids;
CRetrieveResult retrieve_result;
auto res =
Retrieve(segment, plan.get(), dataset.timestamps_[0], &retrieve_result);
auto res = Retrieve(
segment, plan.get(), {}, dataset.timestamps_[0], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
DeleteRetrievePlan(plan.release());
@ -1727,7 +1727,8 @@ TEST(CApiTest, LoadIndex_Search) {
milvus::segcore::LoadIndexInfo load_index_info;
auto& index_params = load_index_info.index_params;
index_params["index_type"] = "IVF_PQ";
load_index_info.index = std::make_unique<VectorMemIndex>(index_params["index_type"], knowhere::metric::L2);
load_index_info.index = std::make_unique<VectorMemIndex>(
index_params["index_type"], knowhere::metric::L2);
load_index_info.index->Load(binary_set);
// search
@ -4033,7 +4034,7 @@ TEST(CApiTest, RetriveScalarFieldFromSealedSegmentWithIndex) {
CRetrieveResult retrieve_result;
res = Retrieve(
segment, plan.get(), raw_data.timestamps_[N - 1], &retrieve_result);
segment, plan.get(), {}, raw_data.timestamps_[N - 1], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
auto query_result = std::make_unique<proto::segcore::RetrieveResults>();
auto suc = query_result->ParseFromArray(retrieve_result.proto_blob,

View File

@ -50,5 +50,9 @@ func startTracer(msg msgstream.TsMsg, name string) (context.Context, trace.Span)
if ctx == nil {
ctx = context.Background()
}
sp := trace.SpanFromContext(ctx)
if sp.SpanContext().IsValid() {
return ctx, sp
}
return otel.Tracer(typeutil.DataNodeRole).Start(ctx, name)
}

View File

@ -168,9 +168,10 @@ func (c *client) deliver(consumer *consumer, batchMax int) {
for _, msg := range msgs {
select {
case consumer.messageCh <- Message{
MsgID: msg.MsgID,
Payload: msg.Payload,
Topic: consumer.Topic()}:
MsgID: msg.MsgID,
Payload: msg.Payload,
Properties: msg.Properties,
Topic: consumer.Topic()}:
case <-c.closeCh:
return
}

View File

@ -256,6 +256,7 @@ func (ms *mqMsgStream) Produce(msgPack *MsgPack) error {
channel := ms.producerChannels[k]
for i := 0; i < len(v.Msgs); i++ {
spanCtx, sp := MsgSpanFromCtx(v.Msgs[i].TraceCtx(), v.Msgs[i])
defer sp.End()
mb, err := v.Msgs[i].Marshal(v.Msgs[i])
if err != nil {
@ -268,17 +269,14 @@ func (ms *mqMsgStream) Produce(msgPack *MsgPack) error {
}
msg := &mqwrapper.ProducerMessage{Payload: m, Properties: map[string]string{}}
InjectCtx(spanCtx, msg.Properties)
ms.producerLock.Lock()
if _, err := ms.producers[channel].Send(spanCtx, msg); err != nil {
ms.producerLock.Unlock()
sp.RecordError(err)
sp.End()
return err
}
sp.End()
ms.producerLock.Unlock()
}
}
@ -306,7 +304,6 @@ func (ms *mqMsgStream) Broadcast(msgPack *MsgPack) (map[string][]MessageID, erro
}
msg := &mqwrapper.ProducerMessage{Payload: m, Properties: map[string]string{}}
InjectCtx(spanCtx, msg.Properties)
ms.producerLock.Lock()
@ -385,10 +382,8 @@ func (ms *mqMsgStream) receiveMsg(consumer mqwrapper.Consumer) {
Timestamp: tsMsg.BeginTs(),
})
ctx, sp := ExtractCtx(tsMsg, msg.Properties())
if ctx != nil {
tsMsg.SetTraceCtx(ctx)
}
ctx, _ := ExtractCtx(tsMsg, msg.Properties())
tsMsg.SetTraceCtx(ctx)
msgPack := MsgPack{
Msgs: []TsMsg{tsMsg},
@ -400,8 +395,6 @@ func (ms *mqMsgStream) receiveMsg(consumer mqwrapper.Consumer) {
case <-ms.ctx.Done():
return
}
sp.End()
}
}
}
@ -688,11 +681,6 @@ func (ms *MqTtMsgStream) consumeToTtMsg(consumer mqwrapper.Consumer) {
continue
}
ctx, sp := ExtractCtx(tsMsg, msg.Properties())
if ctx != nil {
tsMsg.SetTraceCtx(ctx)
}
ms.chanMsgBufMutex.Lock()
ms.chanMsgBuf[consumer] = append(ms.chanMsgBuf[consumer], tsMsg)
ms.chanMsgBufMutex.Unlock()
@ -701,10 +689,8 @@ func (ms *MqTtMsgStream) consumeToTtMsg(consumer mqwrapper.Consumer) {
ms.chanTtMsgTimeMutex.Lock()
ms.chanTtMsgTime[consumer] = tsMsg.(*TimeTickMsg).Base.Timestamp
ms.chanTtMsgTimeMutex.Unlock()
sp.End()
return
}
sp.End()
}
}
}
@ -810,6 +796,9 @@ func (ms *MqTtMsgStream) Seek(msgPositions []*msgpb.MsgPosition) error {
runLoop = false
break
} else if tsMsg.BeginTs() > mp.Timestamp {
ctx, _ := ExtractCtx(tsMsg, msg.Properties())
tsMsg.SetTraceCtx(ctx)
tsMsg.SetPosition(&MsgPosition{
ChannelName: filepath.Base(msg.Topic()),
MsgID: msg.ID().Serialize(),

View File

@ -31,13 +31,13 @@ import (
func ExtractCtx(msg TsMsg, properties map[string]string) (context.Context, trace.Span) {
ctx := msg.TraceCtx()
if ctx == nil {
return ctx, trace.SpanFromContext(ctx)
ctx = context.Background()
}
if !allowTrace(msg) {
return ctx, trace.SpanFromContext(ctx)
}
ctx = otel.GetTextMapPropagator().Extract(msg.TraceCtx(), propagation.MapCarrier(properties))
name := "receive msg"
ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(properties))
name := "ReceieveMsg"
return otel.Tracer(name).Start(ctx, name, trace.WithAttributes(
attribute.Int64("ID", msg.ID()),
attribute.String("Type", msg.Type().String()),
@ -63,7 +63,7 @@ func MsgSpanFromCtx(ctx context.Context, msg TsMsg) (context.Context, trace.Span
if !allowTrace(msg) {
return ctx, trace.SpanFromContext(ctx)
}
operationName := "send msg"
operationName := "SendMsg"
opts := trace.WithAttributes(
attribute.Int64("ID", msg.ID()),
attribute.String("Type", msg.Type().String()),

View File

@ -353,11 +353,21 @@ func (s *Segment) retrieve(ctx context.Context, plan *RetrievePlan) (*segcorepb.
return nil, fmt.Errorf("%w(segmentID=%d)", ErrSegmentUnhealthy, s.segmentID)
}
span := trace.SpanFromContext(ctx)
traceID := span.SpanContext().TraceID()
spanID := span.SpanContext().SpanID()
traceCtx := C.CTraceContext{
traceID: (*C.uint8_t)(unsafe.Pointer(&traceID[0])),
spanID: (*C.uint8_t)(unsafe.Pointer(&spanID[0])),
flag: C.uchar(span.SpanContext().TraceFlags()),
}
var retrieveResult RetrieveResult
ts := C.uint64_t(plan.Timestamp)
tr := timerecord.NewTimeRecorder("cgoRetrieve")
status := C.Retrieve(s.segmentPtr, plan.cRetrievePlan, ts, &retrieveResult.cRetrieveResult)
status := C.Retrieve(s.segmentPtr, plan.cRetrievePlan, traceCtx, ts, &retrieveResult.cRetrieveResult)
metrics.QueryNodeSQSegmentLatencyInCore.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()),
metrics.QueryLabel).Observe(float64(tr.CtxElapse(ctx, "finish cgoRetrieve").Milliseconds()))
log.Debug("do retrieve on segment",

View File

@ -34,7 +34,7 @@ func retrieveOnSegments(ctx context.Context, manager *Manager, segType SegmentTy
if segment == nil {
continue
}
result, err := segment.Retrieve(plan)
result, err := segment.Retrieve(ctx, plan)
if err != nil {
return nil, err
}

View File

@ -363,7 +363,7 @@ func (s *LocalSegment) Search(ctx context.Context, searchReq *SearchRequest) (*S
return &searchResult, nil
}
func (s *LocalSegment) Retrieve(plan *RetrievePlan) (*segcorepb.RetrieveResults, error) {
func (s *LocalSegment) Retrieve(ctx context.Context, plan *RetrievePlan) (*segcorepb.RetrieveResults, error) {
s.mut.RLock()
defer s.mut.RUnlock()
@ -377,12 +377,23 @@ func (s *LocalSegment) Retrieve(plan *RetrievePlan) (*segcorepb.RetrieveResults,
zap.Int64("segmentID", s.ID()),
)
span := trace.SpanFromContext(ctx)
traceID := span.SpanContext().TraceID()
spanID := span.SpanContext().SpanID()
traceCtx := C.CTraceContext{
traceID: (*C.uint8_t)(unsafe.Pointer(&traceID[0])),
spanID: (*C.uint8_t)(unsafe.Pointer(&spanID[0])),
flag: C.uchar(span.SpanContext().TraceFlags()),
}
var retrieveResult RetrieveResult
ts := C.uint64_t(plan.Timestamp)
tr := timerecord.NewTimeRecorder("cgoRetrieve")
status := C.Retrieve(s.ptr,
plan.cRetrievePlan,
traceCtx,
ts,
&retrieveResult.cRetrieveResult,
)

View File

@ -207,6 +207,8 @@ func (node *QueryNode) InitSegcore() {
if len(mmapDirPath) > 0 {
log.Info("mmap enabled", zap.String("dir", mmapDirPath))
}
initcore.InitTraceConfig(paramtable.Get())
}
// Init function init historical and streaming module to manage segments