From 8330c18dc98ec26392d19df4e898d4c51058e430 Mon Sep 17 00:00:00 2001 From: MrPresent-Han <116052805+MrPresent-Han@users.noreply.github.com> Date: Thu, 31 Aug 2023 12:03:00 +0800 Subject: [PATCH] add log for loading segment(#26564) (#26640) /kind improvement Signed-off-by: MrPresent-Han --- internal/core/src/segcore/SegmentSealedImpl.cpp | 9 +++++++-- internal/querynodev2/segments/segment.go | 9 +++++---- internal/querynodev2/segments/segment_loader.go | 2 +- internal/querynodev2/services.go | 9 +++++++-- 4 files changed, 20 insertions(+), 9 deletions(-) diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 23cd9d6b55..a7f627edc2 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -188,18 +188,23 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& load_info) { auto parallel_degree = static_cast( DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE); field_data_info.channel->set_capacity(parallel_degree * 2); - auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE); auto load_future = pool.Submit( LoadFieldDatasFromRemote, insert_files, field_data_info.channel); - + LOG_SEGCORE_INFO_ << "finish submitting LoadFieldDatasFromRemote task " + "to thread pool, " + << "segmentID:" << this->id_ + << ", fieldID:" << info.field_id; if (load_info.mmap_dir_path.empty() || SystemProperty::Instance().IsSystem(field_id)) { LoadFieldData(field_id, field_data_info); } else { MapFieldData(field_id, field_data_info); } + LOG_SEGCORE_INFO_ << "finish loading segment field, " + << "segmentID:" << this->id_ + << ", fieldID:" << info.field_id; } } diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 5c141f1faf..7fd5a797ac 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -706,7 +706,10 @@ func (s *LocalSegment) LoadFieldData(fieldID int64, rowCount int64, field *datap zap.Int64("collectionID", s.Collection()), zap.Int64("partitionID", s.Partition()), zap.Int64("segmentID", s.ID()), + zap.Int64("fieldID", fieldID), + zap.Int64("rowCount", rowCount), ) + log.Info("start loading field data for field") loadFieldDataInfo, err := newLoadFieldDataInfo() defer deleteFieldDataInfo(loadFieldDataInfo) @@ -729,6 +732,7 @@ func (s *LocalSegment) LoadFieldData(fieldID int64, rowCount int64, field *datap var status C.CStatus GetDynamicPool().Submit(func() (any, error) { + log.Info("submitted loadFieldData task to dy pool") status = C.LoadFieldData(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo) return nil, nil }).Await() @@ -736,10 +740,7 @@ func (s *LocalSegment) LoadFieldData(fieldID int64, rowCount int64, field *datap return err } - log.Info("load field done", - zap.Int64("fieldID", fieldID), - zap.Int64("row count", rowCount), - zap.Int64("segmentID", s.ID())) + log.Info("load field done") return nil } diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 9cac1f91a3..1e4d77811f 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -584,7 +584,7 @@ func (loader *segmentLoader) loadSealedSegmentFields(ctx context.Context, segmen return err } - log.Info("load field binlogs done for sealed segment", + log.Ctx(ctx).Info("load field binlogs done for sealed segment", zap.Int64("collection", segment.collectionID), zap.Int64("segment", segment.segmentID), zap.Int("len(field)", len(fields)), diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 6abad18375..30db05f4ff 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -216,6 +216,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm log := log.Ctx(ctx).With( zap.Int64("collectionID", req.GetCollectionID()), zap.String("channel", channel.GetChannelName()), + zap.Int64("currentNodeID", paramtable.GetNodeID()), ) log.Info("received watch channel request", @@ -360,6 +361,7 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC log := log.Ctx(ctx).With( zap.Int64("collectionID", req.GetCollectionID()), zap.String("channel", req.GetChannelName()), + zap.Int64("currentNodeID", paramtable.GetNodeID()), ) log.Info("received unsubscribe channel request") @@ -433,6 +435,7 @@ func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmen zap.Int64("partitionID", segment.GetPartitionID()), zap.String("shard", segment.GetInsertChannel()), zap.Int64("segmentID", segment.GetSegmentID()), + zap.Int64("currentNodeID", paramtable.GetNodeID()), ) log.Info("received load segments request", @@ -552,6 +555,7 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, req *querypb.Release zap.Int64("collectionID", req.GetCollectionID()), zap.String("shard", req.GetShard()), zap.Int64s("segmentIDs", req.GetSegmentIDs()), + zap.Int64("currentNodeID", paramtable.GetNodeID()), ) log.Info("received release segment request", @@ -1244,7 +1248,8 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get } func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDistributionRequest) (*commonpb.Status, error) { - log := log.Ctx(ctx).With(zap.Int64("collectionID", req.GetCollectionID()), zap.String("channel", req.GetChannel())) + log := log.Ctx(ctx).With(zap.Int64("collectionID", req.GetCollectionID()), + zap.String("channel", req.GetChannel()), zap.Int64("currentNodeID", paramtable.GetNodeID())) // check node healthy if !node.lifetime.Add(commonpbutil.IsHealthy) { msg := fmt.Sprintf("query node %d is not ready", paramtable.GetNodeID()) @@ -1266,7 +1271,7 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi // get shard delegator shardDelegator, ok := node.delegators.Get(req.GetChannel()) if !ok { - log.Warn("failed to find shard cluster when sync ", zap.String("channel", req.GetChannel())) + log.Warn("failed to find shard cluster when sync") return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: "shard not exist",