diff --git a/internal/datacoord/channel_store.go b/internal/datacoord/channel_store.go index b306101ec0..8cdc92ef14 100644 --- a/internal/datacoord/channel_store.go +++ b/internal/datacoord/channel_store.go @@ -168,7 +168,7 @@ func (c *ChannelStore) Reload() error { log.Info("channel store reload channel", zap.Int64("nodeID", nodeID), zap.String("channel", channel.Name)) } - record.Record("ChannelStore reload") + log.Info("channel store reload done", zap.Duration("duration", record.ElapseSpan())) return nil } diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 995e391815..7c975afd9c 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -150,8 +150,7 @@ func (m *meta) reloadFromKV() error { for _, segIdx := range segmentIndexes { m.updateSegmentIndex(segIdx) } - - record.Record("meta reloadFromKV") + log.Info("DataCoord meta reloadFromKV done", zap.Duration("duration", record.ElapseSpan())) return nil } diff --git a/internal/indexnode/task.go b/internal/indexnode/task.go index 685ee8754b..0ef13a74ef 100644 --- a/internal/indexnode/task.go +++ b/internal/indexnode/task.go @@ -262,7 +262,7 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error { } } - buildIndexLatency := it.tr.Record("build index done") + buildIndexLatency := it.tr.RecordSpan() metrics.IndexNodeKnowhereBuildIndexLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(buildIndexLatency.Milliseconds())) indexBlobs, err := it.index.Serialize() @@ -270,7 +270,9 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error { log.Ctx(ctx).Error("IndexNode index Serialize failed", zap.Error(err)) return err } - it.tr.Record("index serialize done") + + log.Ctx(ctx).Info("index serialize done", zap.Int64("buildID", it.BuildID), + zap.Duration("duration", it.tr.RecordSpan())) // use serialized size before encoding it.serializedSize = 0 @@ -301,7 +303,7 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error { log.Warn("failed to serialize index", zap.Error(err)) return err } - encodeIndexFileDur := it.tr.Record("index codec serialize done") + encodeIndexFileDur := it.tr.RecordSpan() metrics.IndexNodeEncodeIndexFileLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(encodeIndexFileDur.Milliseconds())) it.indexBlobs = serializedIndexBlobs log.Ctx(ctx).Info("Successfully build index", zap.Int64("buildID", it.BuildID), @@ -379,7 +381,7 @@ func (it *indexBuildTask) BuildDiskAnnIndex(ctx context.Context) error { } } - buildIndexLatency := it.tr.Record("build index done") + buildIndexLatency := it.tr.RecordSpan() metrics.IndexNodeKnowhereBuildIndexLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(buildIndexLatency.Milliseconds())) fileInfos, err := it.index.GetIndexFileInfo() @@ -387,7 +389,9 @@ func (it *indexBuildTask) BuildDiskAnnIndex(ctx context.Context) error { log.Ctx(ctx).Error("IndexNode index Serialize failed", zap.Error(err)) return err } - it.tr.Record("index serialize done") + + log.Ctx(ctx).Info("index serialize done", zap.Int64("buildID", it.BuildID), + zap.Duration("duration", it.tr.RecordSpan())) // use serialized size before encoding it.serializedSize = 0 @@ -404,7 +408,7 @@ func (it *indexBuildTask) BuildDiskAnnIndex(ctx context.Context) error { log.Ctx(it.ctx).Error("IndexNode indexBuildTask Execute CIndexDelete failed", zap.Error(err)) } - encodeIndexFileDur := it.tr.Record("index codec serialize done") + encodeIndexFileDur := it.tr.RecordSpan() metrics.IndexNodeEncodeIndexFileLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(encodeIndexFileDur.Milliseconds())) return nil } @@ -445,7 +449,7 @@ func (it *indexBuildTask) SaveIndexFiles(ctx context.Context) error { it.statistic.EndTime = time.Now().UnixMicro() it.node.storeIndexFilesAndStatistic(it.ClusterID, it.BuildID, saveFileKeys, it.serializedSize, &it.statistic) log.Ctx(ctx).Info("save index files done", zap.Strings("IndexFiles", savePaths)) - saveIndexFileDur := it.tr.Record("index file save done") + saveIndexFileDur := it.tr.RecordSpan() metrics.IndexNodeSaveIndexFileLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(saveIndexFileDur.Milliseconds())) it.tr.Elapse("index building all done") log.Ctx(ctx).Info("Successfully save index files", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID), @@ -505,7 +509,7 @@ func (it *indexBuildTask) SaveDiskAnnIndexFiles(ctx context.Context) error { it.statistic.EndTime = time.Now().UnixMicro() it.node.storeIndexFilesAndStatistic(it.ClusterID, it.BuildID, saveFileKeys, it.serializedSize, &it.statistic) log.Ctx(ctx).Info("save index files done", zap.Strings("IndexFiles", savePaths)) - saveIndexFileDur := it.tr.Record("index file save done") + saveIndexFileDur := it.tr.RecordSpan() metrics.IndexNodeSaveIndexFileLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(saveIndexFileDur.Milliseconds())) it.tr.Elapse("index building all done") log.Ctx(ctx).Info("IndexNode CreateIndex successfully ", zap.Int64("collect", it.collectionID), @@ -529,14 +533,15 @@ func (it *indexBuildTask) decodeBlobs(ctx context.Context, blobs []*storage.Blob it.partitionID = partitionID it.segmentID = segmentID - log.Ctx(ctx).Info("indexnode deserialize data success", + deserializeDur := it.tr.RecordSpan() + + log.Ctx(ctx).Info("IndexNode deserialize data success", zap.Int64("index id", it.req.IndexID), zap.String("index name", it.req.IndexName), zap.Int64("collectionID", it.collectionID), zap.Int64("partitionID", it.partitionID), - zap.Int64("segmentID", it.segmentID)) - - it.tr.Record("deserialize vector data done") + zap.Int64("segmentID", it.segmentID), + zap.Duration("deserialize duration", deserializeDur)) // we can ensure that there blobs are in one Field var data storage.FieldData diff --git a/internal/proxy/task_delete.go b/internal/proxy/task_delete.go index ac8cd12de4..5a46297d7d 100644 --- a/internal/proxy/task_delete.go +++ b/internal/proxy/task_delete.go @@ -236,13 +236,6 @@ func (dt *deleteTask) Execute(ctx context.Context) (err error) { } dt.deleteMsg.HashValues = typeutil.HashPK2Channels(dt.result.IDs, channelNames) - log.Debug("send delete request to virtual channels", - zap.String("collection", dt.deleteMsg.GetCollectionName()), - zap.Int64("collection_id", collID), - zap.Strings("virtual_channels", channelNames), - zap.Int64("task_id", dt.ID())) - - tr.Record("get vchannels") // repack delete msg by dmChannel result := make(map[uint32]msgstream.TsMsg) collectionName := dt.deleteMsg.CollectionName @@ -301,14 +294,20 @@ func (dt *deleteTask) Execute(ctx context.Context) (err error) { } } - tr.Record("pack messages") + log.Debug("send delete request to virtual channels", + zap.String("collection", dt.deleteMsg.GetCollectionName()), + zap.Int64("collection_id", collID), + zap.Strings("virtual_channels", channelNames), + zap.Int64("task_id", dt.ID()), + zap.Duration("prepare duration", tr.RecordSpan())) + err = stream.Produce(msgPack) if err != nil { dt.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError dt.result.Status.Reason = err.Error() return err } - sendMsgDur := tr.Record("send delete request to dml channels") + sendMsgDur := tr.ElapseSpan() metrics.ProxySendMutationReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel).Observe(float64(sendMsgDur.Milliseconds())) return nil diff --git a/internal/proxy/task_insert.go b/internal/proxy/task_insert.go index 3119f4aba0..3e20dae73f 100644 --- a/internal/proxy/task_insert.go +++ b/internal/proxy/task_insert.go @@ -174,7 +174,6 @@ func (it *insertTask) Execute(ctx context.Context) error { defer sp.End() tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute insert %d", it.ID())) - defer tr.Elapse("insert execute done") collectionName := it.insertMsg.CollectionName collID, err := globalMetaCache.GetCollectionID(ctx, collectionName) @@ -195,13 +194,12 @@ func (it *insertTask) Execute(ctx context.Context) error { } } it.insertMsg.PartitionID = partitionID - tr.Record("get collection id & partition id from cache") - + getCacheDur := tr.RecordSpan() stream, err := it.chMgr.getOrCreateDmlStream(collID) if err != nil { return err } - tr.Record("get used message stream") + getMsgStreamDur := tr.RecordSpan() channelNames, err := it.chMgr.getVChannels(collID) if err != nil { @@ -219,7 +217,9 @@ func (it *insertTask) Execute(ctx context.Context) error { zap.Int64("collection_id", collID), zap.Int64("partition_id", partitionID), zap.Strings("virtual_channels", channelNames), - zap.Int64("task_id", it.ID())) + zap.Int64("task_id", it.ID()), + zap.Duration("get cache duration", getCacheDur), + zap.Duration("get msgStream duration", getMsgStreamDur)) // assign segmentID for insert data and repack data by segmentID var msgPack *msgstream.MsgPack @@ -232,21 +232,25 @@ func (it *insertTask) Execute(ctx context.Context) error { it.result.Status.Reason = err.Error() return err } + assignSegmentIDDur := tr.RecordSpan() + log.Debug("assign segmentID for insert data success", zap.Int64("collectionID", collID), - zap.String("collectionName", it.insertMsg.CollectionName)) - tr.Record("assign segment id") + zap.String("collectionName", it.insertMsg.CollectionName), + zap.Duration("assign segmentID duration", assignSegmentIDDur)) err = stream.Produce(msgPack) if err != nil { it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError it.result.Status.Reason = err.Error() return err } - sendMsgDur := tr.Record("send insert request to dml channel") + sendMsgDur := tr.RecordSpan() metrics.ProxySendMutationReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel).Observe(float64(sendMsgDur.Milliseconds())) - + totalExecDur := tr.ElapseSpan() log.Debug("Proxy Insert Execute done", - zap.String("collectionName", collectionName)) + zap.String("collectionName", collectionName), + zap.Duration("send message duration", sendMsgDur), + zap.Duration("execute duration", totalExecDur)) return nil } diff --git a/internal/proxy/task_upsert.go b/internal/proxy/task_upsert.go index a132c82bde..b9d848d90b 100644 --- a/internal/proxy/task_upsert.go +++ b/internal/proxy/task_upsert.go @@ -336,14 +336,13 @@ func (it *upsertTask) insertExecute(ctx context.Context, msgPack *msgstream.MsgP } } it.upsertMsg.InsertMsg.PartitionID = partitionID - tr.Record("get collection id & partition id from cache when insertExecute") + getCacheDur := tr.RecordSpan() _, err = it.chMgr.getOrCreateDmlStream(collID) if err != nil { return err } - tr.Record("get used message stream when insertExecute") - + getMsgStreamDur := tr.RecordSpan() channelNames, err := it.chMgr.getVChannels(collID) if err != nil { log.Error("get vChannels failed when insertExecute", @@ -359,7 +358,9 @@ func (it *upsertTask) insertExecute(ctx context.Context, msgPack *msgstream.MsgP zap.Int64("collection_id", collID), zap.Int64("partition_id", partitionID), zap.Strings("virtual_channels", channelNames), - zap.Int64("task_id", it.ID())) + zap.Int64("task_id", it.ID()), + zap.Duration("get cache duration", getCacheDur), + zap.Duration("get msgStream duration", getMsgStreamDur)) // assign segmentID for insert data and repack data by segmentID insertMsgPack, err := assignSegmentID(it.TraceCtx(), it.upsertMsg.InsertMsg, it.result, channelNames, it.idAllocator, it.segIDAssigner) @@ -370,9 +371,10 @@ func (it *upsertTask) insertExecute(ctx context.Context, msgPack *msgstream.MsgP it.result.Status.Reason = err.Error() return err } + assignSegmentIDDur := tr.RecordSpan() log.Debug("assign segmentID for insert data success when insertExecute", - zap.String("collectionName", it.req.CollectionName)) - tr.Record("assign segment id") + zap.String("collectionName", it.req.CollectionName), + zap.Duration("assign segmentID duration", assignSegmentIDDur)) msgPack.Msgs = append(msgPack.Msgs, insertMsgPack.Msgs...) log.Debug("Proxy Insert Execute done when upsert", @@ -383,8 +385,6 @@ func (it *upsertTask) insertExecute(ctx context.Context, msgPack *msgstream.MsgP func (it *upsertTask) deleteExecute(ctx context.Context, msgPack *msgstream.MsgPack) (err error) { tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy deleteExecute upsert %d", it.ID())) - defer tr.Elapse("delete execute done when upsert") - collID := it.upsertMsg.DeleteMsg.CollectionID log := log.Ctx(ctx).With( zap.Int64("collectionID", collID)) @@ -399,11 +399,6 @@ func (it *upsertTask) deleteExecute(ctx context.Context, msgPack *msgstream.MsgP it.upsertMsg.DeleteMsg.PrimaryKeys = it.result.IDs it.upsertMsg.DeleteMsg.HashValues = typeutil.HashPK2Channels(it.upsertMsg.DeleteMsg.PrimaryKeys, channelNames) - log.Debug("send delete request to virtual channels when deleteExecute", - zap.Int64("collection_id", collID), - zap.Strings("virtual_channels", channelNames)) - - tr.Record("get vchannels") // repack delete msg by dmChannel result := make(map[uint32]msgstream.TsMsg) collectionName := it.upsertMsg.DeleteMsg.CollectionName @@ -463,7 +458,9 @@ func (it *upsertTask) deleteExecute(ctx context.Context, msgPack *msgstream.MsgP } msgPack.Msgs = append(msgPack.Msgs, deleteMsgPack.Msgs...) - log.Debug("Proxy Upsert deleteExecute done") + log.Debug("Proxy Upsert deleteExecute done", zap.Int64("collection_id", collID), + zap.Strings("virtual_channels", channelNames), zap.Int64("taskID", it.ID()), + zap.Duration("prepare duration", tr.ElapseSpan())) return nil } @@ -493,16 +490,18 @@ func (it *upsertTask) Execute(ctx context.Context) (err error) { return err } - tr.Record("pack messages in upsert") + tr.RecordSpan() err = stream.Produce(msgPack) if err != nil { it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError it.result.Status.Reason = err.Error() return err } - sendMsgDur := tr.Record("send upsert request to dml channels") + sendMsgDur := tr.RecordSpan() metrics.ProxySendMutationReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel).Observe(float64(sendMsgDur.Milliseconds())) - log.Debug("Proxy Upsert Execute done") + totalDur := tr.ElapseSpan() + log.Debug("Proxy Upsert Execute done", zap.Int64("taskID", it.ID()), + zap.Duration("total duration", totalDur)) return nil } diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 0f2585c1e5..bd0ba7758d 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -320,8 +320,7 @@ func (s *Server) initMeta() error { LeaderViewManager: meta.NewLeaderViewManager(), } s.targetMgr = meta.NewTargetManager(s.broker, s.meta) - - record.Record("Server initMeta") + log.Info("QueryCoord server initMeta done", zap.Duration("duration", record.ElapseSpan())) return nil } diff --git a/internal/querynode/task_read.go b/internal/querynode/task_read.go index e9d6cfc943..644b3cfff1 100644 --- a/internal/querynode/task_read.go +++ b/internal/querynode/task_read.go @@ -75,9 +75,9 @@ func (b *baseReadTask) SetStep(step TaskStep) { switch step { case TaskStepEnqueue: b.queueDur = 0 - b.tr.Record("enqueue done") + b.tr.RecordSpan() case TaskStepPreExecute: - b.queueDur = b.tr.Record("start to process") + b.queueDur = b.tr.RecordSpan() } } @@ -108,9 +108,9 @@ func (b *baseReadTask) PostExecute(ctx context.Context) error { func (b *baseReadTask) Notify(err error) { switch b.step { case TaskStepEnqueue: - b.queueDur = b.tr.Record("enqueueEnd") + b.queueDur = b.tr.RecordSpan() case TaskStepPostExecute: - b.tr.Record("execute task done") + b.tr.RecordSpan() } b.baseTask.Notify(err) } diff --git a/internal/rootcoord/meta_table.go b/internal/rootcoord/meta_table.go index 36bb270e42..ca639d90ff 100644 --- a/internal/rootcoord/meta_table.go +++ b/internal/rootcoord/meta_table.go @@ -148,8 +148,7 @@ func (mt *MetaTable) reload() error { metrics.RootCoordNumOfCollections.Set(float64(collectionNum)) metrics.RootCoordNumOfPartitions.WithLabelValues().Set(float64(partitionNum)) - - record.Record("MetaTable reload") + log.Info("RootCoord meta table reload done", zap.Duration("duration", record.ElapseSpan())) return nil }