From e8ea8b51d5689cee303e9f16cee155076a3b7061 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Sat, 31 Jul 2021 10:47:22 +0800 Subject: [PATCH] improve query node log (#6897) Signed-off-by: bigsheeper --- internal/querynode/cgo_helper.go | 2 +- internal/querynode/collection_replica.go | 2 +- internal/querynode/flow_graph_dd_node.go | 8 ++--- .../querynode/flow_graph_filter_dm_node.go | 8 ++--- internal/querynode/flow_graph_insert_node.go | 11 +++--- internal/querynode/flow_graph_query_node.go | 2 +- .../querynode/flow_graph_service_time_node.go | 4 +-- internal/querynode/historical.go | 4 +-- internal/querynode/impl.go | 16 ++++----- internal/querynode/index_loader.go | 2 +- internal/querynode/param_table.go | 4 +-- internal/querynode/query_collection.go | 36 +++++++++---------- internal/querynode/query_service.go | 2 +- internal/querynode/reduce.go | 2 +- internal/querynode/segment.go | 4 +-- internal/querynode/segment_loader.go | 14 ++++---- internal/querynode/streaming.go | 4 +-- internal/querynode/task.go | 30 ++++++++-------- internal/querynode/task_scheduler.go | 4 +-- internal/querynode/tsafe_replica.go | 10 +++--- 20 files changed, 84 insertions(+), 85 deletions(-) diff --git a/internal/querynode/cgo_helper.go b/internal/querynode/cgo_helper.go index b7ff253239..554702c58f 100644 --- a/internal/querynode/cgo_helper.go +++ b/internal/querynode/cgo_helper.go @@ -74,7 +74,7 @@ func HandleCStatus(status *C.CStatus, extraInfo string) error { finalMsg := fmt.Sprintf("[%s] %s", errorName, errorMsg) logMsg := fmt.Sprintf("%s, C Runtime Exception: %s\n", extraInfo, finalMsg) - log.Error(logMsg) + log.Warn(logMsg) return errors.New(finalMsg) } diff --git a/internal/querynode/collection_replica.go b/internal/querynode/collection_replica.go index e20a3f9085..7db29c0cb1 100644 --- a/internal/querynode/collection_replica.go +++ b/internal/querynode/collection_replica.go @@ -410,7 +410,7 @@ func (colReplica *collectionReplica) removeSegmentPrivate(segmentID UniqueID) er key := fmt.Sprintf("%s/%d", queryNodeSegmentMetaPrefix, segmentID) err = colReplica.etcdKV.Remove(key) if err != nil { - log.Error("error when remove segment info from etcd") + log.Warn("error when remove segment info from etcd") } return nil diff --git a/internal/querynode/flow_graph_dd_node.go b/internal/querynode/flow_graph_dd_node.go index 4bc25d7a04..8baf31be5f 100644 --- a/internal/querynode/flow_graph_dd_node.go +++ b/internal/querynode/flow_graph_dd_node.go @@ -112,14 +112,14 @@ func (ddNode *ddNode) createCollection(msg *msgstream.CreateCollectionMsg) { var schema schemapb.CollectionSchema err := proto.Unmarshal(msg.Schema, &schema) if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) return } // add collection err = ddNode.replica.addCollection(collectionID, &schema) if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) return } @@ -127,7 +127,7 @@ func (ddNode *ddNode) createCollection(msg *msgstream.CreateCollectionMsg) { // TODO: allocate default partition id in master err = ddNode.replica.addPartition(collectionID, partitionID) if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) return } @@ -156,7 +156,7 @@ func (ddNode *ddNode) createPartition(msg *msgstream.CreatePartitionMsg) { err := ddNode.replica.addPartition(collectionID, partitionID) if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) return } diff --git a/internal/querynode/flow_graph_filter_dm_node.go b/internal/querynode/flow_graph_filter_dm_node.go index c50000f1a3..319649fb8e 100644 --- a/internal/querynode/flow_graph_filter_dm_node.go +++ b/internal/querynode/flow_graph_filter_dm_node.go @@ -128,7 +128,7 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg if fdmNode.loadType == loadTypeCollection { col, err := fdmNode.replica.getCollectionByID(msg.CollectionID) if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) return nil } if err = col.checkReleasedPartitions([]UniqueID{msg.PartitionID}); err != nil { @@ -142,7 +142,7 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg // so we need to compare the endTimestamp of received messages and position's timestamp. excludedSegments, err := fdmNode.replica.getExcludedSegments(fdmNode.collectionID) if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) return nil } for _, segmentInfo := range excludedSegments { @@ -156,7 +156,7 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) { // TODO: what if the messages are misaligned? Here, we ignore those messages and print error - log.Error("Error, misaligned messages detected") + log.Warn("Error, misaligned messages detected") return nil } @@ -184,7 +184,7 @@ func newFilteredDmNode(replica ReplicaInterface, if loadType != loadTypeCollection && loadType != loadTypePartition { err := errors.New("invalid flow graph type") - log.Error(err.Error()) + log.Warn(err.Error()) } return &filterDmNode{ diff --git a/internal/querynode/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go index 2d143fd52c..f8a1c005dd 100644 --- a/internal/querynode/flow_graph_insert_node.go +++ b/internal/querynode/flow_graph_insert_node.go @@ -79,7 +79,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { if hasPartition := iNode.replica.hasPartition(task.PartitionID); !hasPartition { err := iNode.replica.addPartition(task.CollectionID, task.PartitionID) if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) continue } } @@ -88,7 +88,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { if !iNode.replica.hasSegment(task.SegmentID) { err := iNode.replica.addSegment(task.SegmentID, task.PartitionID, task.CollectionID, task.ChannelID, segmentTypeGrowing, true) if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) continue } } @@ -102,15 +102,14 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { for segmentID := range insertData.insertRecords { var targetSegment, err = iNode.replica.getSegmentByID(segmentID) if err != nil { - log.Error("preInsert failed") - // TODO: add error handling + log.Warn(err.Error()) } var numOfRecords = len(insertData.insertRecords[segmentID]) if targetSegment != nil { offset, err := targetSegment.segmentPreInsert(numOfRecords) if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) } insertData.insertOffset[segmentID] = offset log.Debug("insertNode operator", zap.Int("insert size", numOfRecords), zap.Int64("insert offset", offset), zap.Int64("segment id", segmentID)) @@ -144,7 +143,7 @@ func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *syn return } if err != nil { - log.Error("cannot find segment:", zap.Int64("segmentID", segmentID)) + log.Warn("cannot find segment:", zap.Int64("segmentID", segmentID)) // TODO: add error handling wg.Done() return diff --git a/internal/querynode/flow_graph_query_node.go b/internal/querynode/flow_graph_query_node.go index 9d2bac360e..1592238d3c 100644 --- a/internal/querynode/flow_graph_query_node.go +++ b/internal/querynode/flow_graph_query_node.go @@ -105,7 +105,7 @@ func newQueryNodeFlowGraph(ctx context.Context, func (q *queryNodeFlowGraph) newDmInputNode(ctx context.Context, factory msgstream.Factory) *flowgraph.InputNode { insertStream, err := factory.NewTtMsgStream(ctx) if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) } else { q.dmlStream = insertStream } diff --git a/internal/querynode/flow_graph_service_time_node.go b/internal/querynode/flow_graph_service_time_node.go index 85c165a96e..10b278b3c8 100644 --- a/internal/querynode/flow_graph_service_time_node.go +++ b/internal/querynode/flow_graph_service_time_node.go @@ -73,7 +73,7 @@ func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { //) //if err := stNode.sendTimeTick(serviceTimeMsg.timeRange.timestampMax); err != nil { - // log.Error("Error: send time tick into pulsar channel failed", zap.Error(err)) + // log.Warn("Error: send time tick into pulsar channel failed", zap.Error(err)) //} var res Msg = &gcMsg{ @@ -121,7 +121,7 @@ func newServiceTimeNode(ctx context.Context, timeTimeMsgStream, err := factory.NewMsgStream(ctx) if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) } else { // TODO: use param table timeTickChannel := "query-node-time-tick-0" diff --git a/internal/querynode/historical.go b/internal/querynode/historical.go index 78494a6c9e..23ea0bf2ce 100644 --- a/internal/querynode/historical.go +++ b/internal/querynode/historical.go @@ -91,7 +91,7 @@ func (h *historical) watchGlobalSegmentMeta() { for _, event := range resp.Events { segmentID, err := strconv.ParseInt(filepath.Base(string(event.Kv.Key)), 10, 64) if err != nil { - log.Error("watchGlobalSegmentMeta failed", zap.Any("error", err.Error())) + log.Warn("watchGlobalSegmentMeta failed", zap.Any("error", err.Error())) continue } switch event.Type { @@ -102,7 +102,7 @@ func (h *historical) watchGlobalSegmentMeta() { segmentInfo := &querypb.SegmentInfo{} err = proto.UnmarshalText(string(event.Kv.Value), segmentInfo) if err != nil { - log.Error("watchGlobalSegmentMeta failed", zap.Any("error", err.Error())) + log.Warn("watchGlobalSegmentMeta failed", zap.Any("error", err.Error())) continue } h.addGlobalSegmentInfo(segmentID, segmentInfo) diff --git a/internal/querynode/impl.go b/internal/querynode/impl.go index f0b02c521f..5dd8a70d82 100644 --- a/internal/querynode/impl.go +++ b/internal/querynode/impl.go @@ -211,7 +211,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, in *queryPb.WatchDmC ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: err.Error(), } - log.Error(err.Error()) + log.Warn(err.Error()) return status, err } log.Debug("watchDmChannelsTask Enqueue done", zap.Any("collectionID", in.CollectionID)) @@ -219,7 +219,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, in *queryPb.WatchDmC func() { err = dct.WaitToFinish() if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) return } log.Debug("watchDmChannelsTask WaitToFinish done", zap.Any("collectionID", in.CollectionID)) @@ -256,7 +256,7 @@ func (node *QueryNode) LoadSegments(ctx context.Context, in *queryPb.LoadSegment ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: err.Error(), } - log.Error(err.Error()) + log.Warn(err.Error()) return status, err } segmentIDs := make([]UniqueID, 0) @@ -268,7 +268,7 @@ func (node *QueryNode) LoadSegments(ctx context.Context, in *queryPb.LoadSegment func() { err = dct.WaitToFinish() if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) return } log.Debug("loadSegmentsTask WaitToFinish done", zap.Int64s("segmentIDs", segmentIDs)) @@ -305,7 +305,7 @@ func (node *QueryNode) ReleaseCollection(ctx context.Context, in *queryPb.Releas ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: err.Error(), } - log.Error(err.Error()) + log.Warn(err.Error()) return status, err } log.Debug("releaseCollectionTask Enqueue done", zap.Any("collectionID", in.CollectionID)) @@ -313,7 +313,7 @@ func (node *QueryNode) ReleaseCollection(ctx context.Context, in *queryPb.Releas func() { err = dct.WaitToFinish() if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) return } log.Debug("releaseCollectionTask WaitToFinish done", zap.Any("collectionID", in.CollectionID)) @@ -350,7 +350,7 @@ func (node *QueryNode) ReleasePartitions(ctx context.Context, in *queryPb.Releas ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: err.Error(), } - log.Error(err.Error()) + log.Warn(err.Error()) return status, err } log.Debug("releasePartitionsTask Enqueue done", zap.Any("collectionID", in.CollectionID)) @@ -358,7 +358,7 @@ func (node *QueryNode) ReleasePartitions(ctx context.Context, in *queryPb.Releas func() { err = dct.WaitToFinish() if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) return } log.Debug("releasePartitionsTask WaitToFinish done", zap.Any("collectionID", in.CollectionID)) diff --git a/internal/querynode/index_loader.go b/internal/querynode/index_loader.go index 54ebe00d18..d2cb130e5f 100644 --- a/internal/querynode/index_loader.go +++ b/internal/querynode/index_loader.go @@ -85,7 +85,7 @@ type indexLoader struct { // // sendQueryNodeStats // err := loader.sendQueryNodeStats() // if err != nil { -// log.Error(err.Error()) +// log.Warn(err.Error()) // wg.Done() // return // } diff --git a/internal/querynode/param_table.go b/internal/querynode/param_table.go index 10985125fc..2fb56af348 100644 --- a/internal/querynode/param_table.go +++ b/internal/querynode/param_table.go @@ -119,7 +119,7 @@ func (p *ParamTable) Init() { func (p *ParamTable) initQueryTimeTickChannelName() { ch, err := p.Load("msgChannel.chanNamePrefix.queryTimeTick") if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) } p.QueryTimeTickChannelName = ch } @@ -240,7 +240,7 @@ func (p *ParamTable) initGracefulTime() { func (p *ParamTable) initMsgChannelSubName() { namePrefix, err := p.Load("msgChannel.subNamePrefix.queryNodeSubNamePrefix") if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) } subName := namePrefix + "-" + strconv.FormatInt(p.QueryNodeID, 10) p.MsgChannelSubName = subName diff --git a/internal/querynode/query_collection.go b/internal/querynode/query_collection.go index a4c7c54ce8..34f551cf6f 100644 --- a/internal/querynode/query_collection.go +++ b/internal/querynode/query_collection.go @@ -128,7 +128,7 @@ func (q *queryCollection) close() { func (q *queryCollection) register() { collection, err := q.streaming.replica.getCollectionByID(q.collectionID) if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) return } @@ -166,7 +166,7 @@ func (q *queryCollection) waitNewTSafe() Timestamp { // block until any vChannel updating tSafe _, _, recvOK := reflect.Select(q.watcherSelectCase) if !recvOK { - //log.Error("tSafe has been closed", zap.Any("collectionID", q.collectionID)) + //log.Warn("tSafe has been closed", zap.Any("collectionID", q.collectionID)) return Timestamp(math.MaxInt64) } //log.Debug("wait new tSafe", zap.Any("collectionID", s.collectionID)) @@ -247,7 +247,7 @@ func (q *queryCollection) loadBalance(msg *msgstream.LoadBalanceSegmentsMsg) { // if nodeID == info.SourceNodeID { // err := s.historical.replica.removeSegment(segmentID) // if err != nil { - // log.Error("loadBalance failed when remove segment", + // log.Warn("loadBalance failed when remove segment", // zap.Error(err), // zap.Any("segmentID", segmentID)) // } @@ -255,7 +255,7 @@ func (q *queryCollection) loadBalance(msg *msgstream.LoadBalanceSegmentsMsg) { // if nodeID == info.DstNodeID { // segment, err := s.historical.replica.getSegmentByID(segmentID) // if err != nil { - // log.Error("loadBalance failed when making segment on service", + // log.Warn("loadBalance failed when making segment on service", // zap.Error(err), // zap.Any("segmentID", segmentID)) // continue // not return, try to load balance all segment @@ -290,11 +290,11 @@ func (q *queryCollection) receiveQueryMsg(msg queryMsg) { //) default: err := fmt.Errorf("receive invalid msgType = %d", msgType) - log.Error(err.Error()) + log.Warn(err.Error()) return } if collectionID != q.collectionID { - //log.Error("not target collection query request", + //log.Warn("not target collection query request", // zap.Any("collectionID", q.collectionID), // zap.Int64("target collectionID", collectionID), // zap.Int64("msgID", msg.ID()), @@ -309,10 +309,10 @@ func (q *queryCollection) receiveQueryMsg(msg queryMsg) { // check if collection has been released collection, err := q.historical.replica.getCollectionByID(collectionID) if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) err = q.publishFailedQueryResult(msg, err.Error()) if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) } else { log.Debug("do query failed in receiveQueryMsg, publish failed query result", zap.Int64("collectionID", collectionID), @@ -325,10 +325,10 @@ func (q *queryCollection) receiveQueryMsg(msg queryMsg) { guaranteeTs := msg.GuaranteeTs() if guaranteeTs >= collection.getReleaseTime() { err = fmt.Errorf("retrieve failed, collection has been released, msgID = %d, collectionID = %d", msg.ID(), collectionID) - log.Error(err.Error()) + log.Warn(err.Error()) err = q.publishFailedQueryResult(msg, err.Error()) if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) } else { log.Debug("do query failed in receiveQueryMsg, publish failed query result", zap.Int64("collectionID", collectionID), @@ -375,16 +375,16 @@ func (q *queryCollection) receiveQueryMsg(msg queryMsg) { err = q.search(msg) default: err := fmt.Errorf("receive invalid msgType = %d", msgType) - log.Error(err.Error()) + log.Warn(err.Error()) return } tr.Record("operation done") if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) err = q.publishFailedQueryResult(msg, err.Error()) if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) } else { log.Debug("do query failed in receiveQueryMsg, publish failed query result", zap.Int64("collectionID", collectionID), @@ -468,15 +468,15 @@ func (q *queryCollection) doUnsolvedQueryMsg() { err = q.search(m) default: err := fmt.Errorf("receive invalid msgType = %d", msgType) - log.Error(err.Error()) + log.Warn(err.Error()) return } if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) err = q.publishFailedQueryResult(m, err.Error()) if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) } else { log.Debug("do query failed in doUnsolvedMsg, publish failed query result", zap.Int64("collectionID", q.collectionID), @@ -849,7 +849,7 @@ func (q *queryCollection) search(msg queryMsg) error { // historical search hisSearchResults, hisSegmentResults, err1 := q.historical.search(searchRequests, collectionID, searchMsg.PartitionIDs, plan, travelTimestamp) if err1 != nil { - log.Error(err1.Error()) + log.Warn(err1.Error()) return err1 } searchResults = append(searchResults, hisSearchResults...) @@ -866,7 +866,7 @@ func (q *queryCollection) search(msg queryMsg) error { var strSegmentResults []*Segment strSearchResults, strSegmentResults, err2 = q.streaming.search(searchRequests, collectionID, searchMsg.PartitionIDs, channel, plan, travelTimestamp) if err2 != nil { - log.Error(err2.Error()) + log.Warn(err2.Error()) return err2 } searchResults = append(searchResults, strSearchResults...) diff --git a/internal/querynode/query_service.go b/internal/querynode/query_service.go index e5c8388832..8d072bb424 100644 --- a/internal/querynode/query_service.go +++ b/internal/querynode/query_service.go @@ -117,7 +117,7 @@ func (q *queryService) hasQueryCollection(collectionID UniqueID) bool { func (q *queryService) stopQueryCollection(collectionID UniqueID) { sc, ok := q.queryCollections[collectionID] if !ok { - log.Error("stopQueryCollection failed, collection doesn't exist", zap.Int64("collectionID", collectionID)) + log.Warn("stopQueryCollection failed, collection doesn't exist", zap.Int64("collectionID", collectionID)) return } sc.close() diff --git a/internal/querynode/reduce.go b/internal/querynode/reduce.go index 90e5da4582..a8cdfcba08 100644 --- a/internal/querynode/reduce.go +++ b/internal/querynode/reduce.go @@ -67,7 +67,7 @@ func fillTargetEntry(plan *SearchPlan, searchResults []*SearchResult, matchedSeg go func(i int) { err := matchedSegments[i].fillTargetEntry(plan, searchResults[i]) if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) } wg.Done() }(i) diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index 3f42ec3534..37a33c7784 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -170,14 +170,14 @@ func newSegment(collection *Collection, segmentID int64, partitionID UniqueID, c var segmentPtr C.CSegmentInterface switch segType { case segmentTypeInvalid: - log.Error("illegal segment type when create segment") + log.Warn("illegal segment type when create segment") return nil case segmentTypeSealed: segmentPtr = C.NewSegment(collection.collectionPtr, C.ulong(segmentID), C.Sealed) case segmentTypeGrowing: segmentPtr = C.NewSegment(collection.collectionPtr, C.ulong(segmentID), C.Growing) default: - log.Error("illegal segment type when create segment") + log.Warn("illegal segment type when create segment") return nil } diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index cbaa95c07b..608703b2a5 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -101,13 +101,13 @@ func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest, onSer err = loader.loadSegmentInternal(collectionID, segment, info) if err != nil { deleteSegment(segment) - log.Error(err.Error()) + log.Warn(err.Error()) continue } err = loader.historicalReplica.setSegment(segment) if err != nil { deleteSegment(segment) - log.Error(err.Error()) + log.Warn(err.Error()) continue } if onService { @@ -115,14 +115,14 @@ func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest, onSer value, err := loader.etcdKV.Load(key) if err != nil { deleteSegment(segment) - log.Error("error when load segment info from etcd", zap.Any("error", err.Error())) + log.Warn("error when load segment info from etcd", zap.Any("error", err.Error())) continue } segmentInfo := &querypb.SegmentInfo{} err = proto.UnmarshalText(value, segmentInfo) if err != nil { deleteSegment(segment) - log.Error("error when unmarshal segment info from etcd", zap.Any("error", err.Error())) + log.Warn("error when unmarshal segment info from etcd", zap.Any("error", err.Error())) continue } segmentInfo.SegmentState = querypb.SegmentState_sealed @@ -130,7 +130,7 @@ func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest, onSer err = loader.etcdKV.Save(newKey, proto.MarshalTextString(segmentInfo)) if err != nil { deleteSegment(segment) - log.Error("error when update segment info to etcd", zap.Any("error", err.Error())) + log.Warn("error when update segment info to etcd", zap.Any("error", err.Error())) } } } @@ -220,7 +220,7 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, fieldBinlog defer func() { err := iCodec.Close() if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) } }() blobs := make([]*storage.Blob, 0) @@ -247,7 +247,7 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, fieldBinlog _, _, insertData, err := iCodec.Deserialize(blobs) if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) return err } diff --git a/internal/querynode/streaming.go b/internal/querynode/streaming.go index a03bc5f337..c96955b9ce 100644 --- a/internal/querynode/streaming.go +++ b/internal/querynode/streaming.go @@ -143,13 +143,13 @@ func (s *streaming) search(searchReqs []*searchRequest, zap.Any("segmentIDs", segIDs), ) if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) return searchResults, segmentResults, err } for _, segID := range segIDs { seg, err := s.replica.getSegmentByID(segID) if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) return searchResults, segmentResults, err } diff --git a/internal/querynode/task.go b/internal/querynode/task.go index 0aa32e90a9..01b05814c6 100644 --- a/internal/querynode/task.go +++ b/internal/querynode/task.go @@ -90,7 +90,7 @@ func (b *baseTask) Notify(err error) { // watchDmChannelsTask func (w *watchDmChannelsTask) Timestamp() Timestamp { if w.req.Base == nil { - log.Error("nil base req in watchDmChannelsTask", zap.Any("collectionID", w.req.CollectionID)) + log.Warn("nil base req in watchDmChannelsTask", zap.Any("collectionID", w.req.CollectionID)) return 0 } return w.req.Base.Timestamp @@ -217,7 +217,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { } err = w.node.streaming.replica.addExcludedSegments(collectionID, checkPointInfos) if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) return err } log.Debug("watchDMChannel, add check points info done", zap.Any("collectionID", collectionID)) @@ -262,7 +262,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { err := fg.consumerFlowGraph(VPChannels[channel], consumeSubName) if err != nil { errMsg := "msgStream consume error :" + err.Error() - log.Error(errMsg) + log.Warn(errMsg) return errors.New(errMsg) } } @@ -282,7 +282,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { err := fg.seekQueryNodeFlowGraph(pos) if err != nil { errMsg := "msgStream seek error :" + err.Error() - log.Error(errMsg) + log.Warn(errMsg) return errors.New(errMsg) } } @@ -316,7 +316,7 @@ func (w *watchDmChannelsTask) PostExecute(ctx context.Context) error { // loadSegmentsTask func (l *loadSegmentsTask) Timestamp() Timestamp { if l.req.Base == nil { - log.Error("nil base req in loadSegmentsTask") + log.Warn("nil base req in loadSegmentsTask") return 0 } return l.req.Base.Timestamp @@ -352,7 +352,7 @@ func (l *loadSegmentsTask) Execute(ctx context.Context) error { } if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) return err } @@ -382,7 +382,7 @@ func (l *loadSegmentsTask) PostExecute(ctx context.Context) error { // releaseCollectionTask func (r *releaseCollectionTask) Timestamp() Timestamp { if r.req.Base == nil { - log.Error("nil base req in releaseCollectionTask", zap.Any("collectionID", r.req.CollectionID)) + log.Warn("nil base req in releaseCollectionTask", zap.Any("collectionID", r.req.CollectionID)) return 0 } return r.req.Base.Timestamp @@ -405,7 +405,7 @@ func (r *releaseCollectionTask) Execute(ctx context.Context) error { log.Debug("receive release collection task", zap.Any("collectionID", r.req.CollectionID)) collection, err := r.node.streaming.replica.getCollectionByID(r.req.CollectionID) if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) return err } collection.setReleaseTime(r.req.Base.Timestamp) @@ -435,7 +435,7 @@ func (r *releaseCollectionTask) Execute(ctx context.Context) error { if hasCollectionInHistorical { err := r.node.historical.replica.removeCollection(r.req.CollectionID) if err != nil { - log.Error(errMsg + err.Error()) + log.Warn(errMsg + err.Error()) return } } @@ -444,7 +444,7 @@ func (r *releaseCollectionTask) Execute(ctx context.Context) error { if hasCollectionInStreaming { err := r.node.streaming.replica.removeCollection(r.req.CollectionID) if err != nil { - log.Error(errMsg + err.Error()) + log.Warn(errMsg + err.Error()) return } } @@ -465,7 +465,7 @@ func (r *releaseCollectionTask) PostExecute(ctx context.Context) error { // releasePartitionsTask func (r *releasePartitionsTask) Timestamp() Timestamp { if r.req.Base == nil { - log.Error("nil base req in releasePartitionsTask", zap.Any("collectionID", r.req.CollectionID)) + log.Warn("nil base req in releasePartitionsTask", zap.Any("collectionID", r.req.CollectionID)) return 0 } return r.req.Base.Timestamp @@ -496,13 +496,13 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error { hCol, err := r.node.historical.replica.getCollectionByID(r.req.CollectionID) if err != nil { - log.Error(errMsg + err.Error()) + log.Warn(errMsg + err.Error()) return } sCol, err := r.node.streaming.replica.getCollectionByID(r.req.CollectionID) if err != nil { - log.Error(errMsg + err.Error()) + log.Warn(errMsg + err.Error()) return } @@ -526,7 +526,7 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error { err = r.node.historical.replica.removePartition(id) if err != nil { // not return, try to release all partitions - log.Error(errMsg + err.Error()) + log.Warn(errMsg + err.Error()) } } hCol.addReleasedPartition(id) @@ -535,7 +535,7 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error { if hasPartitionInStreaming { err = r.node.streaming.replica.removePartition(id) if err != nil { - log.Error(errMsg + err.Error()) + log.Warn(errMsg + err.Error()) } } sCol.addReleasedPartition(id) diff --git a/internal/querynode/task_scheduler.go b/internal/querynode/task_scheduler.go index 1908f18534..2276778907 100644 --- a/internal/querynode/task_scheduler.go +++ b/internal/querynode/task_scheduler.go @@ -44,7 +44,7 @@ func (s *taskScheduler) processTask(t task, q taskQueue) { t.Notify(err) }() if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) return } @@ -55,7 +55,7 @@ func (s *taskScheduler) processTask(t task, q taskQueue) { err = t.Execute(s.ctx) if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) return } err = t.PostExecute(s.ctx) diff --git a/internal/querynode/tsafe_replica.go b/internal/querynode/tsafe_replica.go index 8d15501201..70dc9e484f 100644 --- a/internal/querynode/tsafe_replica.go +++ b/internal/querynode/tsafe_replica.go @@ -40,7 +40,7 @@ func (t *tSafeReplica) getTSafe(vChannel Channel) Timestamp { defer t.mu.Unlock() safer, err := t.getTSaferPrivate(vChannel) if err != nil { - log.Error("get tSafe failed", zap.Error(err)) + log.Warn("get tSafe failed", zap.Error(err)) return 0 } return safer.get() @@ -51,7 +51,7 @@ func (t *tSafeReplica) setTSafe(vChannel Channel, id UniqueID, timestamp Timesta defer t.mu.Unlock() safer, err := t.getTSaferPrivate(vChannel) if err != nil { - log.Error("set tSafe failed", zap.Error(err)) + log.Warn("set tSafe failed", zap.Error(err)) return } safer.set(id, timestamp) @@ -60,7 +60,7 @@ func (t *tSafeReplica) setTSafe(vChannel Channel, id UniqueID, timestamp Timesta func (t *tSafeReplica) getTSaferPrivate(vChannel Channel) (tSafer, error) { if _, ok := t.tSafes[vChannel]; !ok { err := errors.New("cannot found tSafer, vChannel = " + vChannel) - //log.Error(err.Error()) + //log.Warn(err.Error()) return nil, err } return t.tSafes[vChannel], nil @@ -75,7 +75,7 @@ func (t *tSafeReplica) addTSafe(vChannel Channel) { t.tSafes[vChannel].start() log.Debug("add tSafe done", zap.Any("channel", vChannel)) } else { - log.Error("tSafe has been existed", zap.Any("channel", vChannel)) + log.Warn("tSafe has been existed", zap.Any("channel", vChannel)) } } @@ -98,7 +98,7 @@ func (t *tSafeReplica) registerTSafeWatcher(vChannel Channel, watcher *tSafeWatc defer t.mu.Unlock() safer, err := t.getTSaferPrivate(vChannel) if err != nil { - log.Error("register tSafe watcher failed", zap.Error(err)) + log.Warn("register tSafe watcher failed", zap.Error(err)) return } safer.registerTSafeWatcher(watcher)