From 50dab2d3949150d5ba39895aff8a9d6d0c0ba3cd Mon Sep 17 00:00:00 2001 From: Enwei Jiao Date: Wed, 29 Mar 2023 18:10:02 +0800 Subject: [PATCH] Add tracing for retrieve (#23033) Signed-off-by: Enwei Jiao --- internal/core/src/segcore/segment_c.cpp | 8 +++++ internal/core/src/segcore/segment_c.h | 1 + internal/core/unittest/test_c_api.cpp | 35 ++++++++++--------- internal/datanode/util.go | 4 +++ .../mq/mqimpl/rocksmq/client/client_impl.go | 7 ++-- internal/mq/msgstream/mq_msgstream.go | 23 ++++-------- internal/mq/msgstream/trace.go | 8 ++--- internal/querynode/segment.go | 12 ++++++- internal/querynodev2/segments/retrieve.go | 2 +- internal/querynodev2/segments/segment.go | 13 ++++++- internal/querynodev2/server.go | 2 ++ 11 files changed, 71 insertions(+), 44 deletions(-) diff --git a/internal/core/src/segcore/segment_c.cpp b/internal/core/src/segcore/segment_c.cpp index 74d33dee7d..30d264c9ed 100644 --- a/internal/core/src/segcore/segment_c.cpp +++ b/internal/core/src/segcore/segment_c.cpp @@ -104,12 +104,18 @@ DeleteRetrieveResult(CRetrieveResult* retrieve_result) { CStatus Retrieve(CSegmentInterface c_segment, CRetrievePlan c_plan, + CTraceContext c_trace, uint64_t timestamp, CRetrieveResult* result) { try { auto segment = static_cast(c_segment); auto plan = static_cast(c_plan); + + auto ctx = milvus::tracer::TraceContext{ + c_trace.traceID, c_trace.spanID, c_trace.flag}; + auto span = milvus::tracer::StartSpan("SegcoreRetrieve", &ctx); + auto retrieve_result = segment->Retrieve(plan, timestamp); auto size = retrieve_result->ByteSizeLong(); @@ -118,6 +124,8 @@ Retrieve(CSegmentInterface c_segment, result->proto_blob = buffer; result->proto_size = size; + + span->End(); return milvus::SuccessCStatus(); } catch (std::exception& e) { return milvus::FailureCStatus(UnexpectedError, e.what()); diff --git a/internal/core/src/segcore/segment_c.h b/internal/core/src/segcore/segment_c.h index 8ae75b3264..17ec12ae28 100644 --- a/internal/core/src/segcore/segment_c.h +++ b/internal/core/src/segcore/segment_c.h @@ -51,6 +51,7 @@ DeleteRetrieveResult(CRetrieveResult* retrieve_result); CStatus Retrieve(CSegmentInterface c_segment, CRetrievePlan c_plan, + CTraceContext c_trace, uint64_t timestamp, CRetrieveResult* result); diff --git a/internal/core/unittest/test_c_api.cpp b/internal/core/unittest/test_c_api.cpp index 05fd7cf186..0108f693b7 100644 --- a/internal/core/unittest/test_c_api.cpp +++ b/internal/core/unittest/test_c_api.cpp @@ -384,7 +384,7 @@ TEST(CApiTest, MultiDeleteGrowingSegment) { CRetrieveResult retrieve_result; res = Retrieve( - segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result); + segment, plan.get(), {}, dataset.timestamps_[N - 1], &retrieve_result); ASSERT_EQ(res.error_code, Success); auto query_result = std::make_unique(); auto suc = query_result->ParseFromArray(retrieve_result.proto_blob, @@ -399,7 +399,7 @@ TEST(CApiTest, MultiDeleteGrowingSegment) { FieldId(101), DataType::INT64, retrive_pks); plan->plan_node_->predicate_ = std::move(term_expr); res = Retrieve( - segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result); + segment, plan.get(), {}, dataset.timestamps_[N - 1], &retrieve_result); ASSERT_EQ(res.error_code, Success); suc = query_result->ParseFromArray(retrieve_result.proto_blob, retrieve_result.proto_size); @@ -424,7 +424,7 @@ TEST(CApiTest, MultiDeleteGrowingSegment) { // retrieve pks in {2} res = Retrieve( - segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result); + segment, plan.get(), {}, dataset.timestamps_[N - 1], &retrieve_result); ASSERT_EQ(res.error_code, Success); suc = query_result->ParseFromArray(retrieve_result.proto_blob, retrieve_result.proto_size); @@ -515,7 +515,7 @@ TEST(CApiTest, MultiDeleteSealedSegment) { CRetrieveResult retrieve_result; res = Retrieve( - segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result); + segment, plan.get(), {}, dataset.timestamps_[N - 1], &retrieve_result); ASSERT_EQ(res.error_code, Success); auto query_result = std::make_unique(); auto suc = query_result->ParseFromArray(retrieve_result.proto_blob, @@ -530,7 +530,7 @@ TEST(CApiTest, MultiDeleteSealedSegment) { FieldId(101), DataType::INT64, retrive_pks); plan->plan_node_->predicate_ = std::move(term_expr); res = Retrieve( - segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result); + segment, plan.get(), {}, dataset.timestamps_[N - 1], &retrieve_result); ASSERT_EQ(res.error_code, Success); suc = query_result->ParseFromArray(retrieve_result.proto_blob, retrieve_result.proto_size); @@ -555,7 +555,7 @@ TEST(CApiTest, MultiDeleteSealedSegment) { // retrieve pks in {2} res = Retrieve( - segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result); + segment, plan.get(), {}, dataset.timestamps_[N - 1], &retrieve_result); ASSERT_EQ(res.error_code, Success); suc = query_result->ParseFromArray(retrieve_result.proto_blob, retrieve_result.proto_size); @@ -615,7 +615,7 @@ TEST(CApiTest, DeleteRepeatedPksFromGrowingSegment) { CRetrieveResult retrieve_result; res = Retrieve( - segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result); + segment, plan.get(), {}, dataset.timestamps_[N - 1], &retrieve_result); ASSERT_EQ(res.error_code, Success); auto query_result = std::make_unique(); auto suc = query_result->ParseFromArray(retrieve_result.proto_blob, @@ -643,7 +643,7 @@ TEST(CApiTest, DeleteRepeatedPksFromGrowingSegment) { // retrieve pks in {1, 2, 3} res = Retrieve( - segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result); + segment, plan.get(), {}, dataset.timestamps_[N - 1], &retrieve_result); ASSERT_EQ(res.error_code, Success); query_result = std::make_unique(); @@ -717,7 +717,7 @@ TEST(CApiTest, DeleteRepeatedPksFromSealedSegment) { CRetrieveResult retrieve_result; res = Retrieve( - segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result); + segment, plan.get(), {}, dataset.timestamps_[N - 1], &retrieve_result); ASSERT_EQ(res.error_code, Success); auto query_result = std::make_unique(); auto suc = query_result->ParseFromArray(retrieve_result.proto_blob, @@ -746,7 +746,7 @@ TEST(CApiTest, DeleteRepeatedPksFromSealedSegment) { // retrieve pks in {1, 2, 3} res = Retrieve( - segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result); + segment, plan.get(), {}, dataset.timestamps_[N - 1], &retrieve_result); ASSERT_EQ(res.error_code, Success); query_result = std::make_unique(); @@ -815,7 +815,7 @@ TEST(CApiTest, InsertSamePkAfterDeleteOnGrowingSegment) { CRetrieveResult retrieve_result; res = Retrieve( - segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result); + segment, plan.get(), {}, dataset.timestamps_[N - 1], &retrieve_result); ASSERT_EQ(res.error_code, Success); auto query_result = std::make_unique(); auto suc = query_result->ParseFromArray(retrieve_result.proto_blob, @@ -840,7 +840,7 @@ TEST(CApiTest, InsertSamePkAfterDeleteOnGrowingSegment) { // retrieve pks in {1, 2, 3}, timestamp = 19 res = Retrieve( - segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result); + segment, plan.get(), {}, dataset.timestamps_[N - 1], &retrieve_result); ASSERT_EQ(res.error_code, Success); query_result = std::make_unique(); @@ -933,7 +933,7 @@ TEST(CApiTest, InsertSamePkAfterDeleteOnSealedSegment) { CRetrieveResult retrieve_result; res = Retrieve( - segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result); + segment, plan.get(), {}, dataset.timestamps_[N - 1], &retrieve_result); ASSERT_EQ(res.error_code, Success); auto query_result = std::make_unique(); auto suc = query_result->ParseFromArray(retrieve_result.proto_blob, @@ -1119,8 +1119,8 @@ TEST(CApiTest, RetrieveTestWithExpr) { plan->field_ids_ = target_field_ids; CRetrieveResult retrieve_result; - auto res = - Retrieve(segment, plan.get(), dataset.timestamps_[0], &retrieve_result); + auto res = Retrieve( + segment, plan.get(), {}, dataset.timestamps_[0], &retrieve_result); ASSERT_EQ(res.error_code, Success); DeleteRetrievePlan(plan.release()); @@ -1727,7 +1727,8 @@ TEST(CApiTest, LoadIndex_Search) { milvus::segcore::LoadIndexInfo load_index_info; auto& index_params = load_index_info.index_params; index_params["index_type"] = "IVF_PQ"; - load_index_info.index = std::make_unique(index_params["index_type"], knowhere::metric::L2); + load_index_info.index = std::make_unique( + index_params["index_type"], knowhere::metric::L2); load_index_info.index->Load(binary_set); // search @@ -4033,7 +4034,7 @@ TEST(CApiTest, RetriveScalarFieldFromSealedSegmentWithIndex) { CRetrieveResult retrieve_result; res = Retrieve( - segment, plan.get(), raw_data.timestamps_[N - 1], &retrieve_result); + segment, plan.get(), {}, raw_data.timestamps_[N - 1], &retrieve_result); ASSERT_EQ(res.error_code, Success); auto query_result = std::make_unique(); auto suc = query_result->ParseFromArray(retrieve_result.proto_blob, diff --git a/internal/datanode/util.go b/internal/datanode/util.go index 44ae37d2b9..db1d53ae67 100644 --- a/internal/datanode/util.go +++ b/internal/datanode/util.go @@ -50,5 +50,9 @@ func startTracer(msg msgstream.TsMsg, name string) (context.Context, trace.Span) if ctx == nil { ctx = context.Background() } + sp := trace.SpanFromContext(ctx) + if sp.SpanContext().IsValid() { + return ctx, sp + } return otel.Tracer(typeutil.DataNodeRole).Start(ctx, name) } diff --git a/internal/mq/mqimpl/rocksmq/client/client_impl.go b/internal/mq/mqimpl/rocksmq/client/client_impl.go index 345b3dffbb..446161075a 100644 --- a/internal/mq/mqimpl/rocksmq/client/client_impl.go +++ b/internal/mq/mqimpl/rocksmq/client/client_impl.go @@ -168,9 +168,10 @@ func (c *client) deliver(consumer *consumer, batchMax int) { for _, msg := range msgs { select { case consumer.messageCh <- Message{ - MsgID: msg.MsgID, - Payload: msg.Payload, - Topic: consumer.Topic()}: + MsgID: msg.MsgID, + Payload: msg.Payload, + Properties: msg.Properties, + Topic: consumer.Topic()}: case <-c.closeCh: return } diff --git a/internal/mq/msgstream/mq_msgstream.go b/internal/mq/msgstream/mq_msgstream.go index d212ca1b6d..451a4fe276 100644 --- a/internal/mq/msgstream/mq_msgstream.go +++ b/internal/mq/msgstream/mq_msgstream.go @@ -256,6 +256,7 @@ func (ms *mqMsgStream) Produce(msgPack *MsgPack) error { channel := ms.producerChannels[k] for i := 0; i < len(v.Msgs); i++ { spanCtx, sp := MsgSpanFromCtx(v.Msgs[i].TraceCtx(), v.Msgs[i]) + defer sp.End() mb, err := v.Msgs[i].Marshal(v.Msgs[i]) if err != nil { @@ -268,17 +269,14 @@ func (ms *mqMsgStream) Produce(msgPack *MsgPack) error { } msg := &mqwrapper.ProducerMessage{Payload: m, Properties: map[string]string{}} - InjectCtx(spanCtx, msg.Properties) ms.producerLock.Lock() if _, err := ms.producers[channel].Send(spanCtx, msg); err != nil { ms.producerLock.Unlock() sp.RecordError(err) - sp.End() return err } - sp.End() ms.producerLock.Unlock() } } @@ -306,7 +304,6 @@ func (ms *mqMsgStream) Broadcast(msgPack *MsgPack) (map[string][]MessageID, erro } msg := &mqwrapper.ProducerMessage{Payload: m, Properties: map[string]string{}} - InjectCtx(spanCtx, msg.Properties) ms.producerLock.Lock() @@ -385,10 +382,8 @@ func (ms *mqMsgStream) receiveMsg(consumer mqwrapper.Consumer) { Timestamp: tsMsg.BeginTs(), }) - ctx, sp := ExtractCtx(tsMsg, msg.Properties()) - if ctx != nil { - tsMsg.SetTraceCtx(ctx) - } + ctx, _ := ExtractCtx(tsMsg, msg.Properties()) + tsMsg.SetTraceCtx(ctx) msgPack := MsgPack{ Msgs: []TsMsg{tsMsg}, @@ -400,8 +395,6 @@ func (ms *mqMsgStream) receiveMsg(consumer mqwrapper.Consumer) { case <-ms.ctx.Done(): return } - - sp.End() } } } @@ -688,11 +681,6 @@ func (ms *MqTtMsgStream) consumeToTtMsg(consumer mqwrapper.Consumer) { continue } - ctx, sp := ExtractCtx(tsMsg, msg.Properties()) - if ctx != nil { - tsMsg.SetTraceCtx(ctx) - } - ms.chanMsgBufMutex.Lock() ms.chanMsgBuf[consumer] = append(ms.chanMsgBuf[consumer], tsMsg) ms.chanMsgBufMutex.Unlock() @@ -701,10 +689,8 @@ func (ms *MqTtMsgStream) consumeToTtMsg(consumer mqwrapper.Consumer) { ms.chanTtMsgTimeMutex.Lock() ms.chanTtMsgTime[consumer] = tsMsg.(*TimeTickMsg).Base.Timestamp ms.chanTtMsgTimeMutex.Unlock() - sp.End() return } - sp.End() } } } @@ -810,6 +796,9 @@ func (ms *MqTtMsgStream) Seek(msgPositions []*msgpb.MsgPosition) error { runLoop = false break } else if tsMsg.BeginTs() > mp.Timestamp { + ctx, _ := ExtractCtx(tsMsg, msg.Properties()) + tsMsg.SetTraceCtx(ctx) + tsMsg.SetPosition(&MsgPosition{ ChannelName: filepath.Base(msg.Topic()), MsgID: msg.ID().Serialize(), diff --git a/internal/mq/msgstream/trace.go b/internal/mq/msgstream/trace.go index 85c73ebb73..e610951e30 100644 --- a/internal/mq/msgstream/trace.go +++ b/internal/mq/msgstream/trace.go @@ -31,13 +31,13 @@ import ( func ExtractCtx(msg TsMsg, properties map[string]string) (context.Context, trace.Span) { ctx := msg.TraceCtx() if ctx == nil { - return ctx, trace.SpanFromContext(ctx) + ctx = context.Background() } if !allowTrace(msg) { return ctx, trace.SpanFromContext(ctx) } - ctx = otel.GetTextMapPropagator().Extract(msg.TraceCtx(), propagation.MapCarrier(properties)) - name := "receive msg" + ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(properties)) + name := "ReceieveMsg" return otel.Tracer(name).Start(ctx, name, trace.WithAttributes( attribute.Int64("ID", msg.ID()), attribute.String("Type", msg.Type().String()), @@ -63,7 +63,7 @@ func MsgSpanFromCtx(ctx context.Context, msg TsMsg) (context.Context, trace.Span if !allowTrace(msg) { return ctx, trace.SpanFromContext(ctx) } - operationName := "send msg" + operationName := "SendMsg" opts := trace.WithAttributes( attribute.Int64("ID", msg.ID()), attribute.String("Type", msg.Type().String()), diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index a546e64b06..4cfbe26743 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -353,11 +353,21 @@ func (s *Segment) retrieve(ctx context.Context, plan *RetrievePlan) (*segcorepb. return nil, fmt.Errorf("%w(segmentID=%d)", ErrSegmentUnhealthy, s.segmentID) } + span := trace.SpanFromContext(ctx) + + traceID := span.SpanContext().TraceID() + spanID := span.SpanContext().SpanID() + traceCtx := C.CTraceContext{ + traceID: (*C.uint8_t)(unsafe.Pointer(&traceID[0])), + spanID: (*C.uint8_t)(unsafe.Pointer(&spanID[0])), + flag: C.uchar(span.SpanContext().TraceFlags()), + } + var retrieveResult RetrieveResult ts := C.uint64_t(plan.Timestamp) tr := timerecord.NewTimeRecorder("cgoRetrieve") - status := C.Retrieve(s.segmentPtr, plan.cRetrievePlan, ts, &retrieveResult.cRetrieveResult) + status := C.Retrieve(s.segmentPtr, plan.cRetrievePlan, traceCtx, ts, &retrieveResult.cRetrieveResult) metrics.QueryNodeSQSegmentLatencyInCore.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel).Observe(float64(tr.CtxElapse(ctx, "finish cgoRetrieve").Milliseconds())) log.Debug("do retrieve on segment", diff --git a/internal/querynodev2/segments/retrieve.go b/internal/querynodev2/segments/retrieve.go index d4a7c2b1e5..c28f9295ba 100644 --- a/internal/querynodev2/segments/retrieve.go +++ b/internal/querynodev2/segments/retrieve.go @@ -34,7 +34,7 @@ func retrieveOnSegments(ctx context.Context, manager *Manager, segType SegmentTy if segment == nil { continue } - result, err := segment.Retrieve(plan) + result, err := segment.Retrieve(ctx, plan) if err != nil { return nil, err } diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index a5be9b6bba..a4c9afa5da 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -363,7 +363,7 @@ func (s *LocalSegment) Search(ctx context.Context, searchReq *SearchRequest) (*S return &searchResult, nil } -func (s *LocalSegment) Retrieve(plan *RetrievePlan) (*segcorepb.RetrieveResults, error) { +func (s *LocalSegment) Retrieve(ctx context.Context, plan *RetrievePlan) (*segcorepb.RetrieveResults, error) { s.mut.RLock() defer s.mut.RUnlock() @@ -377,12 +377,23 @@ func (s *LocalSegment) Retrieve(plan *RetrievePlan) (*segcorepb.RetrieveResults, zap.Int64("segmentID", s.ID()), ) + span := trace.SpanFromContext(ctx) + + traceID := span.SpanContext().TraceID() + spanID := span.SpanContext().SpanID() + traceCtx := C.CTraceContext{ + traceID: (*C.uint8_t)(unsafe.Pointer(&traceID[0])), + spanID: (*C.uint8_t)(unsafe.Pointer(&spanID[0])), + flag: C.uchar(span.SpanContext().TraceFlags()), + } + var retrieveResult RetrieveResult ts := C.uint64_t(plan.Timestamp) tr := timerecord.NewTimeRecorder("cgoRetrieve") status := C.Retrieve(s.ptr, plan.cRetrievePlan, + traceCtx, ts, &retrieveResult.cRetrieveResult, ) diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index f03e6cc877..c9721779aa 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -207,6 +207,8 @@ func (node *QueryNode) InitSegcore() { if len(mmapDirPath) > 0 { log.Info("mmap enabled", zap.String("dir", mmapDirPath)) } + + initcore.InitTraceConfig(paramtable.Get()) } // Init function init historical and streaming module to manage segments