From a7a9eeeffed6e570bf0f25d61b9ceea7361a97f7 Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Wed, 30 Jun 2021 19:46:14 +0800 Subject: [PATCH] Add time record for build index and search process (#6231) * add timerecord for build index process Signed-off-by: yudong.cai * fix build error Signed-off-by: yudong.cai * add timerecord for query process Signed-off-by: yudong.cai --- internal/indexnode/task.go | 21 ++++++++----- internal/querycoord/task.go | 2 +- internal/querynode/query_collection.go | 42 ++++++++++++++++++-------- internal/querynode/segment.go | 6 ++-- 4 files changed, 47 insertions(+), 24 deletions(-) diff --git a/internal/indexnode/task.go b/internal/indexnode/task.go index c2d077b520..81648b1787 100644 --- a/internal/indexnode/task.go +++ b/internal/indexnode/task.go @@ -14,11 +14,10 @@ package indexnode import ( "context" "errors" + "fmt" "runtime" "strconv" - "github.com/milvus-io/milvus/internal/util/retry" - "github.com/golang/protobuf/proto" "go.uber.org/zap" @@ -29,6 +28,8 @@ import ( "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/funcutil" + "github.com/milvus-io/milvus/internal/util/retry" + "github.com/milvus-io/milvus/internal/util/timerecord" ) const ( @@ -176,6 +177,7 @@ func (it *IndexBuildTask) PostExecute(ctx context.Context) error { func (it *IndexBuildTask) Execute(ctx context.Context) error { log.Debug("IndexNode IndexBuildTask Execute ...") + tr := timerecord.NewTimeRecorder(fmt.Sprintf("IndexBuildTask %d", it.req.IndexBuildID)) var err error typeParams := make(map[string]string) @@ -275,6 +277,7 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error { if err != nil { return err } + tr.Record("loadKey done") storageBlobs := getStorageBlobs(blobs) var insertCodec storage.InsertCodec @@ -286,6 +289,7 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error { if len(insertData.Data) != 1 { return errors.New("we expect only one field in deserialized insert data") } + tr.Record("deserialize storage blobs done") for _, value := range insertData.Data { // TODO: BinaryVectorFieldData @@ -296,6 +300,7 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error { log.Error("IndexNode BuildFloatVecIndexWithoutIds failed", zap.Error(err)) return err } + tr.Record("build float vector index done") } binaryVectorFieldData, bOk := value.(*storage.BinaryVectorFieldData) @@ -305,6 +310,7 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error { log.Error("IndexNode BuildBinaryVecIndexWithoutIds failed", zap.Error(err)) return err } + tr.Record("build binary vector index done") } if !fOk && !bOk { @@ -314,15 +320,16 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error { indexBlobs, err := it.index.Serialize() if err != nil { log.Error("IndexNode index Serialize failed", zap.Error(err)) - return err } + tr.Record("serialize index done") var indexCodec storage.IndexCodec serializedIndexBlobs, err := indexCodec.Serialize(getStorageBlobs(indexBlobs), indexParams, it.req.IndexName, it.req.IndexID) if err != nil { return err } + tr.Record("serialize index codec done") getSavePathByKey := func(key string) string { // TODO: fix me, use more reasonable method @@ -351,7 +358,7 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error { log.Debug("IndexNode Unmarshal indexMeta error ", zap.Error(err)) return err } - log.Debug("IndexNode Unmarshal indexMeta success ", zap.Any("meta", indexMeta)) + //log.Debug("IndexNode Unmarshal indexMeta success ", zap.Any("meta", indexMeta)) if indexMeta.Version > it.req.Version { log.Debug("IndexNode try saveIndexFile failed req.Version is low", zap.Any("req.Version", it.req.Version), zap.Any("indexMeta.Version", indexMeta.Version)) @@ -373,10 +380,8 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error { if err != nil { return err } + tr.Record("save index file done") } - // err = it.index.Delete() - // if err != nil { - // log.Print("CIndexDelete Failed") - // } + tr.Elapse("all done") return nil } diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index ab4d3765b7..999b33e1d5 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -1255,7 +1255,7 @@ func (lbt *LoadBalanceTask) Execute(ctx context.Context) error { } } } - assignInternalTask(collectionID, lbt, lbt.meta, lbt.cluster, loadSegmentReqs, watchDmChannelReqs) + assignInternalTask(ctx, collectionID, lbt, lbt.meta, lbt.cluster, loadSegmentReqs, watchDmChannelReqs) log.Debug("loadBalanceTask: assign child task done", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs)) } } diff --git a/internal/querynode/query_collection.go b/internal/querynode/query_collection.go index 187c83059c..a6627c84aa 100644 --- a/internal/querynode/query_collection.go +++ b/internal/querynode/query_collection.go @@ -19,9 +19,8 @@ import ( "reflect" "sync" - "github.com/milvus-io/milvus/internal/proto/schemapb" - "github.com/milvus-io/milvus/internal/proto/segcorepb" - "github.com/milvus-io/milvus/internal/util/typeutil" + oplog "github.com/opentracing/opentracing-go/log" + "go.uber.org/zap" "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus/internal/log" @@ -29,10 +28,12 @@ import ( "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/milvus-io/milvus/internal/proto/schemapb" + "github.com/milvus-io/milvus/internal/proto/segcorepb" + "github.com/milvus-io/milvus/internal/util/timerecord" "github.com/milvus-io/milvus/internal/util/trace" "github.com/milvus-io/milvus/internal/util/tsoutil" - oplog "github.com/opentracing/opentracing-go/log" - "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/util/typeutil" ) type queryCollection struct { @@ -285,6 +286,7 @@ func (q *queryCollection) receiveQueryMsg(msg msgstream.TsMsg) { sp, ctx := trace.StartSpanFromContext(msg.TraceCtx()) msg.SetTraceCtx(ctx) + tr := timerecord.NewTimeRecorder(fmt.Sprintf("receiveQueryMsg %d", msg.ID())) // check if collection has been released collection, err := q.historical.replica.getCollectionByID(collectionID) @@ -340,6 +342,8 @@ func (q *queryCollection) receiveQueryMsg(msg msgstream.TsMsg) { sp.Finish() return } + tr.Record("get searchable time done") + log.Debug("doing query in receiveQueryMsg...", zap.Int64("collectionID", collectionID), zap.Int64("msgID", msg.ID()), @@ -355,6 +359,7 @@ func (q *queryCollection) receiveQueryMsg(msg msgstream.TsMsg) { log.Error(err.Error()) return } + tr.Record("operation done") if err != nil { log.Error(err.Error()) @@ -374,6 +379,7 @@ func (q *queryCollection) receiveQueryMsg(msg msgstream.TsMsg) { zap.Int64("msgID", msg.ID()), zap.String("msgType", msgTypeStr), ) + tr.Elapse("all done") sp.Finish() } @@ -393,9 +399,7 @@ func (q *queryCollection) doUnsolvedQueryMsg() { zap.Any("tSafe", st)) q.setServiceableTime(serviceTime) - //log.Debug("query node::doUnsolvedMsg: setServiceableTime", - // zap.Any("serviceTime", st), - //) + //log.Debug("query node::doUnsolvedMsg: setServiceableTime", zap.Any("serviceTime", st)) unSolvedMsg := make([]msgstream.TsMsg, 0) tempMsg := q.popAllUnsolvedMsg() @@ -752,6 +756,8 @@ func (q *queryCollection) search(msg msgstream.TsMsg) error { oplog.Object("dsl", searchMsg.Dsl)) } + tr := timerecord.NewTimeRecorder(fmt.Sprintf("search %d(nq=%d, k=%d)", searchMsg.CollectionID, queryNum, topK)) + searchResults := make([]*SearchResult, 0) matchedSegments := make([]*Segment, 0) sealedSegmentSearched := make([]UniqueID, 0) @@ -767,6 +773,7 @@ func (q *queryCollection) search(msg msgstream.TsMsg) error { for _, seg := range hisSegmentResults { sealedSegmentSearched = append(sealedSegmentSearched, seg.segmentID) } + tr.Record("historical search done") // streaming search var err2 error @@ -781,6 +788,7 @@ func (q *queryCollection) search(msg msgstream.TsMsg) error { searchResults = append(searchResults, strSearchResults...) matchedSegments = append(matchedSegments, strSegmentResults...) } + tr.Record("streaming search done") sp.LogFields(oplog.String("statistical time", "segment search end")) if len(searchResults) <= 0 { @@ -842,6 +850,8 @@ func (q *queryCollection) search(msg msgstream.TsMsg) error { if err != nil { return err } + tr.Record("publish empty search result done") + tr.Elapse("all done") return nil } } @@ -883,6 +893,7 @@ func (q *queryCollection) search(msg msgstream.TsMsg) error { if err != nil { return err } + tr.Record("reduce result done") var offset int64 = 0 for index := range searchRequests { @@ -962,6 +973,7 @@ func (q *queryCollection) search(msg msgstream.TsMsg) error { if err != nil { return err } + tr.Record("publish search result") } sp.LogFields(oplog.String("statistical time", "before free c++ memory")) @@ -970,6 +982,7 @@ func (q *queryCollection) search(msg msgstream.TsMsg) error { sp.LogFields(oplog.String("statistical time", "stats done")) plan.delete() searchReq.delete() + tr.Elapse("all done") return nil } @@ -1003,6 +1016,8 @@ func (q *queryCollection) retrieve(msg msgstream.TsMsg) error { } defer plan.delete() + tr := timerecord.NewTimeRecorder(fmt.Sprintf("retrieve %d", retrieveMsg.CollectionID)) + var partitionIDsInHistorical []UniqueID var partitionIDsInStreaming []UniqueID partitionIDsInQuery := retrieveMsg.PartitionIDs @@ -1029,7 +1044,6 @@ func (q *queryCollection) retrieve(msg msgstream.TsMsg) error { } } } - sealedSegmentRetrieved := make([]UniqueID, 0) var mergeList []*segcorepb.RetrieveResults for _, partitionID := range partitionIDsInHistorical { @@ -1050,6 +1064,7 @@ func (q *queryCollection) retrieve(msg msgstream.TsMsg) error { sealedSegmentRetrieved = append(sealedSegmentRetrieved, segmentID) } } + tr.Record("historical retrieve done") for _, partitionID := range partitionIDsInStreaming { segmentIDs, err := q.streaming.replica.getSegmentIDs(partitionID) @@ -1068,11 +1083,13 @@ func (q *queryCollection) retrieve(msg msgstream.TsMsg) error { mergeList = append(mergeList, result) } } + tr.Record("streaming retrieve done") result, err := mergeRetrieveResults(mergeList) if err != nil { return err } + tr.Record("merge result done") resultChannelInt := 0 retrieveResultMsg := &msgstream.RetrieveResultMsg{ @@ -1094,15 +1111,16 @@ func (q *queryCollection) retrieve(msg msgstream.TsMsg) error { }, } - err3 := q.publishQueryResult(retrieveResultMsg, retrieveMsg.CollectionID) - if err3 != nil { - return err3 + err = q.publishQueryResult(retrieveResultMsg, retrieveMsg.CollectionID) + if err != nil { + return err } log.Debug("QueryNode publish RetrieveResultMsg", zap.Any("vChannels", collection.getVChannels()), zap.Any("collectionID", collection.ID()), zap.Any("sealedSegmentRetrieved", sealedSegmentRetrieved), ) + tr.Elapse("all done") return nil } diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index 31640320f4..f757ed342f 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -176,13 +176,13 @@ func (s *Segment) getRowCount() int64 { long int getRowCount(CSegmentInterface c_segment); */ - segmentPtrIsNil := s.segmentPtr == nil - log.Debug("QueryNode::Segment::getRowCount", zap.Any("segmentPtrIsNil", segmentPtrIsNil)) + //segmentPtrIsNil := s.segmentPtr == nil + //log.Debug("QueryNode::Segment::getRowCount", zap.Any("segmentPtrIsNil", segmentPtrIsNil)) if s.segmentPtr == nil { return -1 } var rowCount = C.GetRowCount(s.segmentPtr) - log.Debug("QueryNode::Segment::getRowCount", zap.Any("rowCount", rowCount)) + //log.Debug("QueryNode::Segment::getRowCount", zap.Any("rowCount", rowCount)) return int64(rowCount) }