From a0ca4d61088368a70e50c97a596afc6e893e0615 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Mon, 3 Apr 2023 10:54:23 +0800 Subject: [PATCH] Improve search/query logs in Proxy (#23144) Signed-off-by: bigsheeper --- internal/proxy/task_query.go | 81 +++++++++++++++-------------------- internal/proxy/task_search.go | 27 +++++++----- 2 files changed, 51 insertions(+), 57 deletions(-) diff --git a/internal/proxy/task_query.go b/internal/proxy/task_query.go index bcb1307051..2b7eb4f239 100644 --- a/internal/proxy/task_query.go +++ b/internal/proxy/task_query.go @@ -251,52 +251,39 @@ func (t *queryTask) PreExecute(ctx context.Context) error { collectionName := t.request.CollectionName t.collectionName = collectionName + + log := log.Ctx(ctx).With(zap.String("collectionName", collectionName), + zap.Strings("partitionNames", t.request.GetPartitionNames()), + zap.String("requestType", "query")) + if err := validateCollectionName(collectionName); err != nil { - log.Ctx(ctx).Warn("Invalid collectionName.", - zap.String("collectionName", collectionName), - zap.String("requestType", "query")) + log.Warn("Invalid collectionName.") return err } - - log.Ctx(ctx).Debug("Validate collectionName.", - zap.Any("collectionName", collectionName), - zap.Any("requestType", "query")) + log.Debug("Validate collectionName.") collID, err := globalMetaCache.GetCollectionID(ctx, collectionName) if err != nil { - log.Ctx(ctx).Warn("Failed to get collection id.", - zap.Any("collectionName", collectionName), - zap.Any("requestType", "query")) + log.Warn("Failed to get collection id.") return err } - t.CollectionID = collID - log.Ctx(ctx).Debug("Get collection ID by name", - zap.Int64("collectionID", t.CollectionID), - zap.String("collectionName", collectionName), - zap.Any("requestType", "query")) + log.Debug("Get collection ID by name", zap.Int64("collectionID", t.CollectionID)) for _, tag := range t.request.PartitionNames { if err := validatePartitionTag(tag, false); err != nil { - log.Ctx(ctx).Warn("invalid partition name", - zap.String("partition name", tag), - zap.Any("requestType", "query")) + log.Warn("invalid partition name", zap.String("partition name", tag)) return err } } - log.Ctx(ctx).Debug("Validate partition names.", - zap.Any("requestType", "query")) + log.Debug("Validate partition names.") t.RetrieveRequest.PartitionIDs, err = getPartitionIDs(ctx, collectionName, t.request.GetPartitionNames()) if err != nil { - log.Ctx(ctx).Warn("failed to get partitions in collection.", zap.String("collectionName", collectionName), - zap.Error(err), - zap.Any("requestType", "query")) + log.Warn("failed to get partitions in collection.", zap.Error(err)) return err } - log.Ctx(ctx).Debug("Get partitions in collection.", - zap.Any("collectionName", collectionName), - zap.Any("requestType", "query")) + log.Debug("Get partitions in collection.", zap.Int64s("partitionIDs", t.RetrieveRequest.GetPartitionIDs())) //fetch search_growing from search param var ignoreGrowing bool @@ -373,8 +360,7 @@ func (t *queryTask) PreExecute(ctx context.Context) error { } t.DbID = 0 // TODO - log.Ctx(ctx).Debug("Query PreExecute done.", - zap.Any("requestType", "query"), + log.Debug("Query PreExecute done.", zap.Uint64("guarantee_ts", guaranteeTs), zap.Uint64("travel_ts", t.GetTravelTimestamp()), zap.Uint64("timeout_ts", t.GetTimeoutTimestamp())) @@ -384,7 +370,9 @@ func (t *queryTask) PreExecute(ctx context.Context) error { func (t *queryTask) Execute(ctx context.Context) error { tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute query %d", t.ID())) defer tr.CtxElapse(ctx, "done") - log := log.Ctx(ctx) + log := log.Ctx(ctx).With(zap.Int64("collection", t.GetCollectionID()), + zap.Int64s("partitionIDs", t.GetPartitionIDs()), + zap.String("requestType", "query")) executeQuery := func(withCache bool) error { shards, err := globalMetaCache.GetShards(ctx, withCache, t.collectionName) @@ -412,8 +400,7 @@ func (t *queryTask) Execute(ctx context.Context) error { return fmt.Errorf("fail to query on all shard leaders, err=%s", err.Error()) } - log.Debug("Query Execute done.", - zap.String("requestType", "query")) + log.Debug("Query Execute done.") return nil } @@ -423,18 +410,22 @@ func (t *queryTask) PostExecute(ctx context.Context) error { tr.CtxElapse(ctx, "done") }() + log := log.Ctx(ctx).With(zap.Int64("collection", t.GetCollectionID()), + zap.Int64s("partitionIDs", t.GetPartitionIDs()), + zap.String("requestType", "query")) + var err error select { case <-t.TraceCtx().Done(): - log.Ctx(ctx).Warn("proxy", zap.Int64("Query: wait to finish failed, timeout!, msgID:", t.ID())) + log.Warn("proxy", zap.Int64("Query: wait to finish failed, timeout!, msgID:", t.ID())) return nil default: - log.Ctx(ctx).Debug("all queries are finished or canceled") + log.Debug("all queries are finished or canceled") close(t.resultBuf) for res := range t.resultBuf { t.toReduceResults = append(t.toReduceResults, res) - log.Ctx(ctx).Debug("proxy receives one query result", zap.Int64("sourceID", res.GetBase().GetSourceID())) + log.Debug("proxy receives one query result", zap.Int64("sourceID", res.GetBase().GetSourceID())) } } @@ -449,8 +440,7 @@ func (t *queryTask) PostExecute(ctx context.Context) error { } metrics.ProxyReduceResultLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.QueryLabel).Observe(float64(tr.RecordSpan().Milliseconds())) - log.Ctx(ctx).Debug("Query PostExecute done", - zap.String("requestType", "query")) + log.Debug("Query PostExecute done") return nil } @@ -463,29 +453,28 @@ func (t *queryTask) queryShard(ctx context.Context, nodeID int64, qn types.Query Scope: querypb.DataScope_All, } + log := log.Ctx(ctx).With(zap.Int64("collection", t.GetCollectionID()), + zap.Int64s("partitionIDs", t.GetPartitionIDs()), + zap.Int64("nodeID", nodeID), + zap.Strings("channels", channelIDs)) + result, err := qn.Query(ctx, req) if err != nil { - log.Ctx(ctx).Warn("QueryNode query return error", - zap.Int64("nodeID", nodeID), - zap.Strings("channels", channelIDs), zap.Error(err)) + log.Warn("QueryNode query return error", zap.Error(err)) globalMetaCache.DeprecateShardCache(t.collectionName) return err } if result.GetStatus().GetErrorCode() == commonpb.ErrorCode_NotShardLeader { - log.Ctx(ctx).Warn("QueryNode is not shardLeader", zap.Int64("nodeID", nodeID), zap.Strings("channels", channelIDs)) + log.Warn("QueryNode is not shardLeader") globalMetaCache.DeprecateShardCache(t.collectionName) return errInvalidShardLeaders } if result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - log.Ctx(ctx).Warn("QueryNode query result error", - zap.Int64("nodeID", nodeID), - zap.String("reason", result.GetStatus().GetReason())) + log.Warn("QueryNode query result error") return fmt.Errorf("fail to Query, QueryNode ID = %d, reason=%s", nodeID, result.GetStatus().GetReason()) } - log.Ctx(ctx).Debug("get query result", - zap.Int64("nodeID", nodeID), - zap.Strings("channelIDs", channelIDs)) + log.Debug("get query result") t.resultBuf <- result return nil } diff --git a/internal/proxy/task_search.go b/internal/proxy/task_search.go index 9f828c59a9..4235840366 100644 --- a/internal/proxy/task_search.go +++ b/internal/proxy/task_search.go @@ -379,7 +379,9 @@ func (t *searchTask) Execute(ctx context.Context) error { return fmt.Errorf("fail to search on all shard leaders, err=%v", err) } - log.Debug("Search Execute done.") + log.Debug("Search Execute done.", + zap.Int64("collection", t.GetCollectionID()), + zap.Int64s("partitionIDs", t.GetPartitionIDs())) return nil } @@ -420,6 +422,8 @@ func (t *searchTask) PostExecute(ctx context.Context) error { // Reduce all search results log.Ctx(ctx).Debug("proxy search post execute reduce", + zap.Int64("collection", t.GetCollectionID()), + zap.Int64s("partitionIDs", t.GetPartitionIDs()), zap.Int("number of valid search results", len(validSearchResults))) tr.CtxRecord(ctx, "reduceResultStart") primaryFieldSchema, err := typeutil.GetPrimaryFieldSchema(t.schema) @@ -437,7 +441,9 @@ func (t *searchTask) PostExecute(ctx context.Context) error { t.result.CollectionName = t.collectionName t.fillInFieldInfo() - log.Ctx(ctx).Debug("Search post execute done") + log.Ctx(ctx).Debug("Search post execute done", + zap.Int64("collection", t.GetCollectionID()), + zap.Int64s("partitionIDs", t.GetPartitionIDs())) return nil } @@ -451,6 +457,11 @@ func (t *searchTask) searchShard(ctx context.Context, nodeID int64, qn types.Que TotalChannelNum: t.channelNum, } + log := log.Ctx(ctx).With(zap.Int64("collection", t.GetCollectionID()), + zap.Int64s("partitionIDs", t.GetPartitionIDs()), + zap.Int64("nodeID", nodeID), + zap.Strings("channels", channelIDs)) + queryNode := querynode.GetQueryNode() var result *internalpb.SearchResults var err error @@ -461,23 +472,17 @@ func (t *searchTask) searchShard(ctx context.Context, nodeID int64, qn types.Que result, err = qn.Search(ctx, req) } if err != nil { - log.Ctx(ctx).Warn("QueryNode search return error", - zap.Int64("nodeID", nodeID), - zap.Strings("channels", channelIDs), - zap.Error(err)) + log.Warn("QueryNode search return error", zap.Error(err)) globalMetaCache.DeprecateShardCache(t.collectionName) return err } if result.GetStatus().GetErrorCode() == commonpb.ErrorCode_NotShardLeader { - log.Ctx(ctx).Warn("QueryNode is not shardLeader", - zap.Int64("nodeID", nodeID), - zap.Strings("channels", channelIDs)) + log.Warn("QueryNode is not shardLeader") globalMetaCache.DeprecateShardCache(t.collectionName) return errInvalidShardLeaders } if result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - log.Ctx(ctx).Warn("QueryNode search result error", - zap.Int64("nodeID", nodeID), + log.Warn("QueryNode search result error", zap.String("reason", result.GetStatus().GetReason())) return fmt.Errorf("fail to Search, QueryNode ID=%d, reason=%s", nodeID, result.GetStatus().GetReason()) }