diff --git a/internal/querycoord/query_coord.go b/internal/querycoord/query_coord.go index 7c12be21b2..0cc15e86ad 100644 --- a/internal/querycoord/query_coord.go +++ b/internal/querycoord/query_coord.go @@ -592,7 +592,7 @@ func (qc *QueryCoord) loadBalanceSegmentLoop() { // auto balance is executed on replica level onlineNodeIDs := replica.GetNodeIds() if len(onlineNodeIDs) == 0 { - log.Error("loadBalanceSegmentLoop: there are no online QueryNode to balance") + log.Error("loadBalanceSegmentLoop: there are no online QueryNode to balance", zap.Int64("collection", replica.CollectionID), zap.Int64("replica", replica.ReplicaID)) continue } var availableNodeIDs []int64 @@ -601,7 +601,9 @@ func (qc *QueryCoord) loadBalanceSegmentLoop() { if _, ok := nodeID2MemUsage[nodeID]; !ok { nodeInfo, err := qc.cluster.getNodeInfoByID(nodeID) if err != nil { - log.Warn("loadBalanceSegmentLoop: get node info from QueryNode failed", zap.Int64("nodeID", nodeID), zap.Error(err)) + log.Warn("loadBalanceSegmentLoop: get node info from QueryNode failed", + zap.Int64("nodeID", nodeID), zap.Int64("collection", replica.CollectionID), zap.Int64("replica", replica.ReplicaID), + zap.Error(err)) continue } nodeID2MemUsageRate[nodeID] = nodeInfo.(*queryNode).memUsageRate @@ -615,7 +617,9 @@ func (qc *QueryCoord) loadBalanceSegmentLoop() { for _, segmentInfo := range segmentInfos { leastInfo, err := qc.cluster.getSegmentInfoByID(ctx, segmentInfo.SegmentID) if err != nil { - log.Warn("loadBalanceSegmentLoop: get segment info from QueryNode failed", zap.Int64("nodeID", nodeID), zap.Error(err)) + log.Warn("loadBalanceSegmentLoop: get segment info from QueryNode failed", zap.Int64("nodeID", nodeID), + zap.Int64("collection", replica.CollectionID), zap.Int64("replica", replica.ReplicaID), + zap.Error(err)) updateSegmentInfoDone = false break } @@ -626,9 +630,12 @@ func (qc *QueryCoord) loadBalanceSegmentLoop() { nodeID2SegmentInfos[nodeID] = leastSegmentInfos } } - log.Info("loadBalanceSegmentLoop: memory usage rate of all online QueryNode", zap.Any("mem rate", nodeID2MemUsageRate)) + log.Info("loadBalanceSegmentLoop: memory usage rate of all online QueryNode", zap.Int64("collection", replica.CollectionID), + zap.Int64("replica", replica.ReplicaID), zap.Any("mem rate", nodeID2MemUsageRate)) if len(availableNodeIDs) <= 1 { - log.Warn("loadBalanceSegmentLoop: there are too few available query nodes to balance", zap.Int64s("onlineNodeIDs", onlineNodeIDs), zap.Int64s("availableNodeIDs", availableNodeIDs)) + log.Info("loadBalanceSegmentLoop: there are too few available query nodes to balance", + zap.Int64("collection", replica.CollectionID), zap.Int64("replica", replica.ReplicaID), + zap.Int64s("onlineNodeIDs", onlineNodeIDs), zap.Int64s("availableNodeIDs", availableNodeIDs)) continue } @@ -678,6 +685,9 @@ func (qc *QueryCoord) loadBalanceSegmentLoop() { cluster: qc.cluster, meta: qc.meta, } + log.Info("loadBalanceSegmentLoop: generate a loadBalance task", + zap.Int64("collection", replica.CollectionID), zap.Int64("replica", replica.ReplicaID), + zap.Any("task", balanceTask)) loadBalanceTasks = append(loadBalanceTasks, balanceTask) nodeID2MemUsage[sourceNodeID] -= uint64(selectedSegmentInfo.MemSize) nodeID2MemUsage[dstNodeID] += uint64(selectedSegmentInfo.MemSize) @@ -690,13 +700,12 @@ func (qc *QueryCoord) loadBalanceSegmentLoop() { if memoryInsufficient { // no enough memory on query nodes to balance, then notify proxy to stop insert //TODO:: xige-16 - log.Warn("loadBalanceSegmentLoop: QueryNode has insufficient memory, stop inserting data") + log.Warn("loadBalanceSegmentLoop: QueryNode has insufficient memory, stop inserting data", zap.Int64("collection", replica.CollectionID), zap.Int64("replica", replica.ReplicaID)) } } } for _, t := range loadBalanceTasks { qc.scheduler.Enqueue(t) - log.Info("loadBalanceSegmentLoop: enqueue a loadBalance task", zap.Any("task", t)) err := t.waitToFinish() if err != nil { // if failed, wait for next balance loop @@ -707,7 +716,6 @@ func (qc *QueryCoord) loadBalanceSegmentLoop() { log.Info("loadBalanceSegmentLoop: balance task execute success", zap.Any("task", t)) } } - log.Info("loadBalanceSegmentLoop: load balance Done in this loop", zap.Any("tasks", loadBalanceTasks)) } } } diff --git a/internal/querynode/collection.go b/internal/querynode/collection.go index 358195ec9a..202b920928 100644 --- a/internal/querynode/collection.go +++ b/internal/querynode/collection.go @@ -83,7 +83,7 @@ func (c *Collection) addPartitionID(partitionID UniqueID) { defer c.releaseMu.Unlock() c.partitionIDs = append(c.partitionIDs, partitionID) - log.Debug("queryNode collection info after add a partition", + log.Info("queryNode collection info after add a partition", zap.Int64("partitionID", partitionID), zap.Int64("collectionID", c.id), zap.Int64s("partitions", c.partitionIDs)) } @@ -107,14 +107,14 @@ OUTER: for _, dstChan := range channels { for _, srcChan := range c.vChannels { if dstChan == srcChan { - log.Debug("vChannel has been existed in collection's vChannels", + log.Warn("vChannel has been existed in collection's vChannels", zap.Int64("collectionID", c.ID()), zap.String("vChannel", dstChan), ) continue OUTER } } - log.Debug("add vChannel to collection", + log.Info("add vChannel to collection", zap.Int64("collectionID", c.ID()), zap.String("vChannel", dstChan), ) @@ -144,7 +144,7 @@ func (c *Collection) removeVChannel(channel Channel) { } } c.vChannels = tmpChannels - log.Debug("remove vChannel from collection", + log.Info("remove vChannel from collection", zap.Int64("collectionID", c.ID()), zap.String("channel", channel), ) @@ -160,14 +160,14 @@ OUTER: for _, dstChan := range channels { for _, srcChan := range c.pChannels { if dstChan == srcChan { - log.Debug("pChannel has been existed in collection's pChannels", + log.Info("pChannel has been existed in collection's pChannels", zap.Int64("collectionID", c.ID()), zap.String("pChannel", dstChan), ) continue OUTER } } - log.Debug("add pChannel to collection", + log.Info("add pChannel to collection", zap.Int64("collectionID", c.ID()), zap.String("pChannel", dstChan), ) @@ -192,14 +192,14 @@ OUTER: for _, dstChan := range channels { for _, srcChan := range c.pDeltaChannels { if dstChan == srcChan { - log.Debug("pChannel has been existed in collection's pChannels", + log.Info("pChannel has been existed in collection's pChannels", zap.Int64("collectionID", c.ID()), zap.String("pChannel", dstChan), ) continue OUTER } } - log.Debug("add pChannel to collection", + log.Info("add pChannel to collection", zap.Int64("collectionID", c.ID()), zap.String("pChannel", dstChan), ) @@ -232,14 +232,14 @@ OUTER: for _, dstChan := range channels { for _, srcChan := range c.vDeltaChannels { if dstChan == srcChan { - log.Debug("vDeltaChannel has been existed in collection's vDeltaChannels", + log.Info("vDeltaChannel has been existed in collection's vDeltaChannels", zap.Int64("collectionID", c.ID()), zap.String("vChannel", dstChan), ) continue OUTER } } - log.Debug("add vDeltaChannel to collection", + log.Info("add vDeltaChannel to collection", zap.Int64("collectionID", c.ID()), zap.String("vDeltaChannel", dstChan), ) @@ -259,7 +259,7 @@ func (c *Collection) removeVDeltaChannel(channel Channel) { } } c.vDeltaChannels = tmpChannels - log.Debug("remove vDeltaChannel from collection", + log.Info("remove vDeltaChannel from collection", zap.Int64("collectionID", c.ID()), zap.String("channel", channel), ) @@ -323,7 +323,7 @@ func newCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) *Co } C.free(unsafe.Pointer(cSchemaBlob)) - log.Debug("create collection", zap.Int64("collectionID", collectionID)) + log.Info("create collection", zap.Int64("collectionID", collectionID)) newCollection.setReleaseTime(Timestamp(math.MaxUint64)) return newCollection @@ -340,7 +340,7 @@ func deleteCollection(collection *Collection) { collection.collectionPtr = nil - log.Debug("delete collection", zap.Int64("collectionID", collection.ID())) + log.Info("delete collection", zap.Int64("collectionID", collection.ID())) collection = nil } diff --git a/internal/querynode/collection_replica.go b/internal/querynode/collection_replica.go index f302543f40..0f5f2c03fc 100644 --- a/internal/querynode/collection_replica.go +++ b/internal/querynode/collection_replica.go @@ -181,10 +181,10 @@ func (colReplica *collectionReplica) printReplica() { colReplica.mu.Lock() defer colReplica.mu.Unlock() - log.Debug("collections in collectionReplica", zap.Any("info", colReplica.collections)) - log.Debug("partitions in collectionReplica", zap.Any("info", colReplica.partitions)) - log.Debug("segments in collectionReplica", zap.Any("info", colReplica.segments)) - log.Debug("excludedSegments in collectionReplica", zap.Any("info", colReplica.excludedSegments)) + log.Info("collections in collectionReplica", zap.Any("info", colReplica.collections)) + log.Info("partitions in collectionReplica", zap.Any("info", colReplica.partitions)) + log.Info("segments in collectionReplica", zap.Any("info", colReplica.segments)) + log.Info("excludedSegments in collectionReplica", zap.Any("info", colReplica.excludedSegments)) } //----------------------------------------------------------------------------------------------------- collection @@ -210,8 +210,6 @@ func (colReplica *collectionReplica) addCollection(collectionID UniqueID, schema var newCollection = newCollection(collectionID, schema) colReplica.collections[collectionID] = newCollection - log.Debug("Successfully add collection ", zap.Int64("collectionID", collectionID)) - metrics.QueryNodeNumCollections.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(len(colReplica.collections))) return newCollection } diff --git a/internal/querynode/data_sync_service.go b/internal/querynode/data_sync_service.go index 477a5941df..dc317d4bf2 100644 --- a/internal/querynode/data_sync_service.go +++ b/internal/querynode/data_sync_service.go @@ -73,7 +73,7 @@ func (dsService *dataSyncService) addFlowGraphsForDMLChannels(collectionID Uniqu for channel, fg := range results { dsService.dmlChannel2FlowGraph[channel] = fg - log.Debug("add DML flow graph", + log.Info("add DML flow graph", zap.Any("collectionID", collectionID), zap.Any("channel", channel)) metrics.QueryNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Inc() @@ -113,7 +113,7 @@ func (dsService *dataSyncService) addFlowGraphsForDeltaChannels(collectionID Uni for channel, fg := range results { dsService.deltaChannel2FlowGraph[channel] = fg - log.Debug("add delta flow graph", + log.Info("add delta flow graph", zap.Any("collectionID", collectionID), zap.Any("channel", channel)) metrics.QueryNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Inc() @@ -156,7 +156,7 @@ func (dsService *dataSyncService) startFlowGraphByDMLChannel(collectionID Unique if _, ok := dsService.dmlChannel2FlowGraph[channel]; !ok { return fmt.Errorf("DML flow graph doesn't existed, collectionID = %d", collectionID) } - log.Debug("start DML flow graph", + log.Info("start DML flow graph", zap.Any("collectionID", collectionID), zap.Any("channel", channel), ) @@ -172,7 +172,7 @@ func (dsService *dataSyncService) startFlowGraphForDeltaChannel(collectionID Uni if _, ok := dsService.deltaChannel2FlowGraph[channel]; !ok { return fmt.Errorf("delta flow graph doesn't existed, collectionID = %d", collectionID) } - log.Debug("start delta flow graph", + log.Info("start delta flow graph", zap.Any("collectionID", collectionID), zap.Any("channel", channel), ) diff --git a/internal/querynode/flow_graph_delete_node.go b/internal/querynode/flow_graph_delete_node.go index e4a6a93e78..e91cca0dad 100644 --- a/internal/querynode/flow_graph_delete_node.go +++ b/internal/querynode/flow_graph_delete_node.go @@ -17,6 +17,7 @@ package querynode import ( + "reflect" "sync" "github.com/opentracing/opentracing-go" @@ -55,7 +56,11 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { dMsg, ok := in[0].(*deleteMsg) if !ok { - log.Warn("type assertion failed for deleteMsg") + if in[0] == nil { + log.Debug("type assertion failed for deleteMsg because it's nil") + } else { + log.Warn("type assertion failed for deleteMsg", zap.String("name", reflect.TypeOf(in[0]).Name())) + } return []Msg{} } @@ -79,17 +84,18 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { // 1. filter segment by bloom filter for i, delMsg := range dMsg.deleteMessages { traceID, _, _ := trace.InfoFromSpan(spans[i]) - log.Debug("Process delete request in QueryNode", zap.String("traceID", traceID)) + log.Debug("delete in historical replica", + zap.Any("collectionID", delMsg.CollectionID), + zap.Any("collectionName", delMsg.CollectionName), + zap.Int64("numPKs", delMsg.NumRows), + zap.Int("numTS", len(delMsg.Timestamps)), + zap.Any("timestampBegin", delMsg.BeginTs()), + zap.Any("timestampEnd", delMsg.EndTs()), + zap.Any("segmentNum", dNode.replica.getSegmentNum()), + zap.Any("traceID", traceID), + ) if dNode.replica.getSegmentNum() != 0 { - log.Debug("delete in historical replica", - zap.Any("collectionID", delMsg.CollectionID), - zap.Any("collectionName", delMsg.CollectionName), - zap.Int64("numPKs", delMsg.NumRows), - zap.Int("numTS", len(delMsg.Timestamps)), - zap.Any("timestampBegin", delMsg.BeginTs()), - zap.Any("timestampEnd", delMsg.EndTs()), - ) processDeleteMessages(dNode.replica, delMsg, delData) } } @@ -98,7 +104,7 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { for segmentID, pks := range delData.deleteIDs { segment, err := dNode.replica.getSegmentByID(segmentID) if err != nil { - log.Debug(err.Error()) + log.Debug("failed to get segment", zap.Int64("segmentId", segmentID), zap.Error(err)) continue } offset := segment.segmentPreDelete(len(pks)) @@ -126,7 +132,6 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { // delete will do delete operation at segment which id is segmentID func (dNode *deleteNode) delete(deleteData *deleteData, segmentID UniqueID, wg *sync.WaitGroup) { defer wg.Done() - log.Debug("QueryNode::dNode::delete", zap.Any("SegmentID", segmentID)) targetSegment, err := dNode.replica.getSegmentByID(segmentID) if err != nil { log.Error(err.Error()) diff --git a/internal/querynode/flow_graph_filter_delete_node.go b/internal/querynode/flow_graph_filter_delete_node.go index 207fa3327e..5683c78c75 100644 --- a/internal/querynode/flow_graph_filter_delete_node.go +++ b/internal/querynode/flow_graph_filter_delete_node.go @@ -18,6 +18,7 @@ package querynode import ( "fmt" + "reflect" "github.com/opentracing/opentracing-go" "go.uber.org/zap" @@ -50,7 +51,11 @@ func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { msgStreamMsg, ok := in[0].(*MsgStreamMsg) if !ok { - log.Warn("type assertion failed for MsgStreamMsg") + if in[0] == nil { + log.Debug("type assertion failed for MsgStreamMsg because it's nil") + } else { + log.Warn("type assertion failed for MsgStreamMsg", zap.String("name", reflect.TypeOf(in[0]).Name())) + } return []Msg{} } diff --git a/internal/querynode/flow_graph_filter_dm_node.go b/internal/querynode/flow_graph_filter_dm_node.go index 22809feaa5..60798fbfa9 100644 --- a/internal/querynode/flow_graph_filter_dm_node.go +++ b/internal/querynode/flow_graph_filter_dm_node.go @@ -18,6 +18,7 @@ package querynode import ( "fmt" + "reflect" "github.com/opentracing/opentracing-go" "go.uber.org/zap" @@ -43,8 +44,6 @@ func (fdmNode *filterDmNode) Name() string { // Operate handles input messages, to filter invalid insert messages func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { - //log.Debug("Do filterDmNode operation") - if len(in) != 1 { log.Warn("Invalid operate message input in filterDmNode", zap.Int("input length", len(in))) return []Msg{} @@ -52,7 +51,11 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { msgStreamMsg, ok := in[0].(*MsgStreamMsg) if !ok { - log.Warn("type assertion failed for MsgStreamMsg") + if in[0] == nil { + log.Debug("type assertion failed for MsgStreamMsg because it's nil") + } else { + log.Warn("type assertion failed for MsgStreamMsg", zap.String("name", reflect.TypeOf(in[0]).Name())) + } return []Msg{} } @@ -78,7 +81,7 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { for i, msg := range msgStreamMsg.TsMessages() { traceID, _, _ := trace.InfoFromSpan(spans[i]) - log.Info("Filter invalid message in QueryNode", zap.String("traceID", traceID)) + log.Debug("Filter invalid message in QueryNode", zap.String("traceID", traceID)) switch msg.Type() { case commonpb.MsgType_Insert: resMsg := fdmNode.filterInvalidInsertMessage(msg.(*msgstream.InsertMsg)) @@ -192,7 +195,7 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg for _, segmentInfo := range excludedSegments { // unFlushed segment may not have checkPoint, so `segmentInfo.DmlPosition` may be nil if segmentInfo.DmlPosition == nil { - log.Debug("filter unFlushed segment without checkPoint", + log.Warn("filter unFlushed segment without checkPoint", zap.Any("collectionID", msg.CollectionID), zap.Any("partitionID", msg.PartitionID)) return nil diff --git a/internal/querynode/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go index a926b0801e..8e7b84cc57 100644 --- a/internal/querynode/flow_graph_insert_node.go +++ b/internal/querynode/flow_graph_insert_node.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "io" + "reflect" "sort" "strconv" "sync" @@ -69,8 +70,6 @@ func (iNode *insertNode) Name() string { // Operate handles input messages, to execute insert operations func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { - //log.Debug("Do insertNode operation") - if len(in) != 1 { log.Warn("Invalid operate message input in insertNode", zap.Int("input length", len(in))) return []Msg{} @@ -78,7 +77,11 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { iMsg, ok := in[0].(*insertMsg) if !ok { - log.Warn("type assertion failed for insertMsg") + if in[0] == nil { + log.Debug("type assertion failed for insertMsg because it's nil") + } else { + log.Warn("type assertion failed for insertMsg", zap.String("name", reflect.TypeOf(in[0]).Name())) + } return []Msg{} } @@ -111,13 +114,13 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { // if loadType is loadCollection, check if partition exists, if not, create partition col, err := iNode.streamingReplica.getCollectionByID(insertMsg.CollectionID) if err != nil { - log.Error(err.Error()) + log.Warn("failed to get collection", zap.Error(err)) continue } if col.getLoadType() == loadTypeCollection { err = iNode.streamingReplica.addPartition(insertMsg.CollectionID, insertMsg.PartitionID) if err != nil { - log.Error(err.Error()) + log.Warn("failed to add partition", zap.Error(err)) continue } } @@ -126,14 +129,14 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { if !iNode.streamingReplica.hasSegment(insertMsg.SegmentID) { err := iNode.streamingReplica.addSegment(insertMsg.SegmentID, insertMsg.PartitionID, insertMsg.CollectionID, insertMsg.ShardName, segmentTypeGrowing, true) if err != nil { - log.Warn(err.Error()) + log.Warn("failed to add segment", zap.Error(err)) continue } } insertRecord, err := storage.TransferInsertMsgToInsertRecord(col.schema, insertMsg) if err != nil { - log.Error("failed to transfer msgStream.insertMsg to segcorepb.InsertRecord", zap.Error(err)) + log.Warn("failed to transfer msgStream.insertMsg to segcorepb.InsertRecord", zap.Error(err)) return []Msg{} } @@ -146,7 +149,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { } pks, err := getPrimaryKeys(insertMsg, iNode.streamingReplica) if err != nil { - log.Warn(err.Error()) + log.Warn("failed to get primary keys", zap.Error(err)) continue } iData.insertPKs[insertMsg.SegmentID] = append(iData.insertPKs[insertMsg.SegmentID], pks...) @@ -298,13 +301,11 @@ func filterSegmentsByPKs(pks []primaryKey, timestamps []Timestamp, segment *Segm retTss = append(retTss, timestamps[index]) } } - log.Debug("In filterSegmentsByPKs", zap.Any("pk len", len(retPks)), zap.Any("segment", segment.segmentID)) return retPks, retTss, nil } // insert would execute insert operations for specific growing segment func (iNode *insertNode) insert(iData *insertData, segmentID UniqueID, wg *sync.WaitGroup) { - log.Debug("QueryNode::iNode::insert", zap.Any("SegmentID", segmentID)) var targetSegment, err = iNode.streamingReplica.getSegmentByID(segmentID) if err != nil { log.Warn("cannot find segment:", zap.Int64("segmentID", segmentID)) @@ -341,10 +342,9 @@ func (iNode *insertNode) insert(iData *insertData, segmentID UniqueID, wg *sync. // delete would execute delete operations for specific growing segment func (iNode *insertNode) delete(deleteData *deleteData, segmentID UniqueID, wg *sync.WaitGroup) { defer wg.Done() - log.Debug("QueryNode::iNode::delete", zap.Any("SegmentID", segmentID)) targetSegment, err := iNode.streamingReplica.getSegmentByID(segmentID) if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) return } @@ -416,7 +416,7 @@ func getPKsFromRowBasedInsertMsg(msg *msgstream.InsertMsg, schema *schemapb.Coll if t.Key == "dim" { dim, err := strconv.Atoi(t.Value) if err != nil { - log.Error("strconv wrong on get dim", zap.Error(err)) + log.Warn("strconv wrong on get dim", zap.Error(err)) break } offset += dim * 4 @@ -428,7 +428,7 @@ func getPKsFromRowBasedInsertMsg(msg *msgstream.InsertMsg, schema *schemapb.Coll if t.Key == "dim" { dim, err := strconv.Atoi(t.Value) if err != nil { - log.Error("strconv wrong on get dim", zap.Error(err)) + log.Warn("strconv wrong on get dim", zap.Error(err)) return nil, err } offset += dim / 8 diff --git a/internal/querynode/flow_graph_query_node.go b/internal/querynode/flow_graph_query_node.go index 9c7853e439..9dfcb5ed6c 100644 --- a/internal/querynode/flow_graph_query_node.go +++ b/internal/querynode/flow_graph_query_node.go @@ -204,7 +204,7 @@ func (q *queryNodeFlowGraph) consumeFlowGraph(channel Channel, subName ConsumeSu return errors.New("null dml message stream in flow graph") } q.dmlStream.AsConsumer([]string{channel}, subName) - log.Debug("query node flow graph consumes from pChannel", + log.Info("query node flow graph consumes from pChannel", zap.Any("collectionID", q.collectionID), zap.Any("channel", channel), zap.Any("subName", subName), @@ -220,7 +220,7 @@ func (q *queryNodeFlowGraph) consumeFlowGraphFromLatest(channel Channel, subName return errors.New("null dml message stream in flow graph") } q.dmlStream.AsConsumerWithPosition([]string{channel}, subName, mqwrapper.SubscriptionPositionLatest) - log.Debug("query node flow graph consumes from pChannel", + log.Info("query node flow graph consumes from pChannel", zap.Any("collectionID", q.collectionID), zap.Any("channel", channel), zap.Any("subName", subName), @@ -234,7 +234,7 @@ func (q *queryNodeFlowGraph) consumeFlowGraphFromLatest(channel Channel, subName func (q *queryNodeFlowGraph) seekQueryNodeFlowGraph(position *internalpb.MsgPosition) error { q.dmlStream.AsConsumer([]string{position.ChannelName}, position.MsgGroup) err := q.dmlStream.Seek([]*internalpb.MsgPosition{position}) - log.Debug("query node flow graph seeks from pChannel", + log.Info("query node flow graph seeks from pChannel", zap.Any("collectionID", q.collectionID), zap.Any("channel", position.ChannelName), ) @@ -250,7 +250,7 @@ func (q *queryNodeFlowGraph) close() { if q.dmlStream != nil && q.consumerCnt > 0 { metrics.QueryNodeNumConsumers.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Sub(float64(q.consumerCnt)) } - log.Debug("stop query node flow graph", + log.Info("stop query node flow graph", zap.Any("collectionID", q.collectionID), zap.Any("channel", q.channel), ) diff --git a/internal/querynode/flow_graph_service_time_node.go b/internal/querynode/flow_graph_service_time_node.go index 16918fccb4..b751a33ddd 100644 --- a/internal/querynode/flow_graph_service_time_node.go +++ b/internal/querynode/flow_graph_service_time_node.go @@ -18,11 +18,12 @@ package querynode import ( "fmt" - - "go.uber.org/zap" + "reflect" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/flowgraph" + "github.com/milvus-io/milvus/internal/util/tsoutil" + "go.uber.org/zap" ) // serviceTimeNode is one of the nodes in delta flow graph @@ -40,8 +41,6 @@ func (stNode *serviceTimeNode) Name() string { // Operate handles input messages, to execute insert operations func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { - //log.Debug("Do serviceTimeNode operation") - if len(in) != 1 { log.Warn("Invalid operate message input in serviceTimeNode, input length = ", zap.Int("input node", len(in))) return []Msg{} @@ -49,7 +48,11 @@ func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { serviceTimeMsg, ok := in[0].(*serviceTimeMsg) if !ok { - log.Warn("type assertion failed for serviceTimeMsg") + if in[0] == nil { + log.Debug("type assertion failed for serviceTimeMsg because it's nil") + } else { + log.Warn("type assertion failed for serviceTimeMsg", zap.String("name", reflect.TypeOf(in[0]).Name())) + } return []Msg{} } @@ -65,14 +68,13 @@ func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { zap.Error(err), ) } - //p, _ := tsoutil.ParseTS(serviceTimeMsg.timeRange.timestampMax) - //log.Debug("update tSafe:", - // zap.Any("collectionID", stNode.collectionID), - // zap.Any("tSafe", serviceTimeMsg.timeRange.timestampMax), - // zap.Any("tSafe_p", p), - // zap.Any("id", id), - // zap.Any("channel", stNode.vChannel), - //) + p, _ := tsoutil.ParseTS(serviceTimeMsg.timeRange.timestampMax) + log.Debug("update tSafe:", + zap.Any("collectionID", stNode.collectionID), + zap.Any("tSafe", serviceTimeMsg.timeRange.timestampMax), + zap.Any("tSafe_p", p), + zap.Any("channel", stNode.vChannel), + ) return []Msg{} } diff --git a/internal/querynode/impl.go b/internal/querynode/impl.go index 8ce6632641..9a8892d7d9 100644 --- a/internal/querynode/impl.go +++ b/internal/querynode/impl.go @@ -35,7 +35,6 @@ import ( // GetComponentStates returns information about whether the node is healthy func (node *QueryNode) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { - log.Debug("Get QueryNode component states") stats := &internalpb.ComponentStates{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, @@ -114,10 +113,10 @@ func (node *QueryNode) AddQueryChannel(ctx context.Context, in *queryPb.AddQuery ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: err.Error(), } - log.Error(err.Error()) + log.Warn(err.Error()) return status, nil } - log.Debug("addQueryChannelTask Enqueue done", + log.Info("addQueryChannelTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.String("queryChannel", in.QueryChannel), zap.String("queryResultChannel", in.QueryResultChannel), @@ -130,10 +129,10 @@ func (node *QueryNode) AddQueryChannel(ctx context.Context, in *queryPb.AddQuery ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: err.Error(), } - log.Error(err.Error()) + log.Warn(err.Error()) return status, nil } - log.Debug("addQueryChannelTask WaitToFinish done", + log.Info("addQueryChannelTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.String("queryChannel", in.QueryChannel), zap.String("queryResultChannel", in.QueryResultChannel), @@ -224,11 +223,10 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, in *queryPb.WatchDmC ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: err.Error(), } - log.Error(err.Error()) + log.Warn(err.Error()) return status, nil } - log.Debug("watchDmChannelsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()), zap.Int64("replicaID", in.GetReplicaID())) - + log.Info("watchDmChannelsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()), zap.Int64("replicaID", in.GetReplicaID())) waitFunc := func() (*commonpb.Status, error) { err = dct.WaitToFinish() if err != nil { @@ -236,10 +234,10 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, in *queryPb.WatchDmC ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: err.Error(), } - log.Error(err.Error()) + log.Warn(err.Error()) return status, nil } - log.Debug("watchDmChannelsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID())) + log.Info("watchDmChannelsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID())) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, }, nil @@ -274,10 +272,11 @@ func (node *QueryNode) WatchDeltaChannels(ctx context.Context, in *queryPb.Watch ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: err.Error(), } - log.Error(err.Error()) + log.Warn(err.Error()) return status, nil } - log.Debug("watchDeltaChannelsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID())) + + log.Info("watchDeltaChannelsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID())) waitFunc := func() (*commonpb.Status, error) { err = dct.WaitToFinish() @@ -286,10 +285,11 @@ func (node *QueryNode) WatchDeltaChannels(ctx context.Context, in *queryPb.Watch ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: err.Error(), } - log.Error(err.Error()) + log.Warn(err.Error()) return status, nil } - log.Debug("watchDeltaChannelsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID())) + + log.Info("watchDeltaChannelsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID())) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, }, nil @@ -324,14 +324,14 @@ func (node *QueryNode) LoadSegments(ctx context.Context, in *queryPb.LoadSegment ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: err.Error(), } - log.Error(err.Error()) + log.Warn(err.Error()) return status, nil } segmentIDs := make([]UniqueID, 0) for _, info := range in.Infos { segmentIDs = append(segmentIDs, info.SegmentID) } - log.Debug("loadSegmentsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", segmentIDs), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID())) + log.Info("loadSegmentsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", segmentIDs), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID())) waitFunc := func() (*commonpb.Status, error) { err = dct.WaitToFinish() @@ -340,10 +340,10 @@ func (node *QueryNode) LoadSegments(ctx context.Context, in *queryPb.LoadSegment ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: err.Error(), } - log.Error(err.Error()) + log.Warn(err.Error()) return status, nil } - log.Debug("loadSegmentsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", segmentIDs), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID())) + log.Info("loadSegmentsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", segmentIDs), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID())) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, }, nil @@ -378,18 +378,18 @@ func (node *QueryNode) ReleaseCollection(ctx context.Context, in *queryPb.Releas ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: err.Error(), } - log.Error(err.Error()) + log.Warn(err.Error()) return status, nil } - log.Debug("releaseCollectionTask Enqueue done", zap.Int64("collectionID", in.CollectionID)) + log.Info("releaseCollectionTask Enqueue done", zap.Int64("collectionID", in.CollectionID)) func() { err = dct.WaitToFinish() if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) return } - log.Debug("releaseCollectionTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID)) + log.Info("releaseCollectionTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID)) }() status := &commonpb.Status{ @@ -424,18 +424,18 @@ func (node *QueryNode) ReleasePartitions(ctx context.Context, in *queryPb.Releas ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: err.Error(), } - log.Error(err.Error()) + log.Warn(err.Error()) return status, nil } - log.Debug("releasePartitionsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("partitionIDs", in.PartitionIDs)) + log.Info("releasePartitionsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("partitionIDs", in.PartitionIDs)) func() { err = dct.WaitToFinish() if err != nil { - log.Error(err.Error()) + log.Warn(err.Error()) return } - log.Debug("releasePartitionsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("partitionIDs", in.PartitionIDs)) + log.Info("releasePartitionsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("partitionIDs", in.PartitionIDs)) }() status := &commonpb.Status{ @@ -473,7 +473,7 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, in *queryPb.ReleaseS } } - log.Debug("release segments done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", in.SegmentIDs)) + log.Info("release segments done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", in.SegmentIDs)) return status, nil } @@ -500,7 +500,7 @@ func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmen // get info from historical historicalSegmentInfos, err := node.historical.replica.getSegmentInfosByColID(in.CollectionID) if err != nil { - log.Debug("GetSegmentInfo: get historical segmentInfo failed", zap.Int64("collectionID", in.CollectionID), zap.Error(err)) + log.Warn("GetSegmentInfo: get historical segmentInfo failed", zap.Int64("collectionID", in.CollectionID), zap.Error(err)) res := &queryPb.GetSegmentInfoResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -514,7 +514,7 @@ func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmen // get info from streaming streamingSegmentInfos, err := node.streaming.replica.getSegmentInfosByColID(in.CollectionID) if err != nil { - log.Debug("GetSegmentInfo: get streaming segmentInfo failed", zap.Int64("collectionID", in.CollectionID), zap.Error(err)) + log.Warn("GetSegmentInfo: get streaming segmentInfo failed", zap.Int64("collectionID", in.CollectionID), zap.Error(err)) res := &queryPb.GetSegmentInfoResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, diff --git a/internal/querynode/load_index_info.go b/internal/querynode/load_index_info.go index d411f1ffd2..5d506fe504 100644 --- a/internal/querynode/load_index_info.go +++ b/internal/querynode/load_index_info.go @@ -32,12 +32,8 @@ import ( "path/filepath" "unsafe" - "github.com/milvus-io/milvus/internal/proto/schemapb" - - "go.uber.org/zap" - - "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/util/funcutil" ) @@ -112,7 +108,6 @@ func (li *LoadIndexInfo) appendIndexData(bytesIndex [][]byte, indexKeys []string indexPtr := unsafe.Pointer(&byteIndex[0]) indexLen := C.int64_t(len(byteIndex)) binarySetKey := filepath.Base(indexKeys[i]) - log.Debug("", zap.String("index key", binarySetKey)) indexKey := C.CString(binarySetKey) status = C.AppendIndexBinary(cBinarySet, indexPtr, indexLen, indexKey) C.free(unsafe.Pointer(indexKey)) diff --git a/internal/querynode/partition.go b/internal/querynode/partition.go index e1cf96f2ee..1e72d8108f 100644 --- a/internal/querynode/partition.go +++ b/internal/querynode/partition.go @@ -49,7 +49,7 @@ func (p *Partition) ID() UniqueID { // addSegmentID add segmentID to segmentIDs func (p *Partition) addSegmentID(segmentID UniqueID) { p.segmentIDs = append(p.segmentIDs, segmentID) - log.Debug("add a segment to replica", zap.Int64("collectionID", p.collectionID), zap.Int64("partitionID", p.partitionID), zap.Int64("segmentID", segmentID)) + log.Info("add a segment to replica", zap.Int64("collectionID", p.collectionID), zap.Int64("partitionID", p.partitionID), zap.Int64("segmentID", segmentID)) } // removeSegmentID removes segmentID from segmentIDs @@ -61,7 +61,7 @@ func (p *Partition) removeSegmentID(segmentID UniqueID) { } } p.segmentIDs = tmpIDs - log.Debug("remove a segment from replica", zap.Int64("collectionID", p.collectionID), zap.Int64("partitionID", p.partitionID), zap.Int64("segmentID", segmentID)) + log.Info("remove a segment from replica", zap.Int64("collectionID", p.collectionID), zap.Int64("partitionID", p.partitionID), zap.Int64("segmentID", segmentID)) } // newPartition returns a new Partition @@ -71,6 +71,6 @@ func newPartition(collectionID UniqueID, partitionID UniqueID) *Partition { partitionID: partitionID, } - log.Debug("create partition", zap.Int64("partitionID", partitionID)) + log.Info("create partition", zap.Int64("partitionID", partitionID)) return newPartition } diff --git a/internal/querynode/proxy_session_manager.go b/internal/querynode/proxy_session_manager.go index 8797f017a6..8b72a83cbc 100644 --- a/internal/querynode/proxy_session_manager.go +++ b/internal/querynode/proxy_session_manager.go @@ -58,7 +58,7 @@ func NewSessionManager(options ...SessionOpt) *SessionManager { func (c *SessionManager) AddSession(node *NodeInfo) { c.sessions.Lock() defer c.sessions.Unlock() - + log.Info("add proxy session", zap.Int64("node", node.NodeID)) session := NewSession(node, c.sessionCreator) c.sessions.data[node.NodeID] = session } @@ -73,7 +73,7 @@ func (c *SessionManager) Startup(nodes []*NodeInfo) { func (c *SessionManager) DeleteSession(node *NodeInfo) { c.sessions.Lock() defer c.sessions.Unlock() - + log.Info("delete proxy session", zap.Int64("node", node.NodeID)) if session, ok := c.sessions.data[node.NodeID]; ok { session.Dispose() delete(c.sessions.data, node.NodeID) diff --git a/internal/querynode/query_channel.go b/internal/querynode/query_channel.go index d5a9c6df82..9d8b1fe825 100644 --- a/internal/querynode/query_channel.go +++ b/internal/querynode/query_channel.go @@ -44,11 +44,11 @@ func (qc *queryChannel) AsConsumer(channelName string, subName string, position qc.queryMsgStream.AsConsumer([]string{channelName}, subName) metrics.QueryNodeNumConsumers.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Inc() if position == nil || len(position.MsgID) == 0 { - log.Debug("QueryNode AsConsumer", zap.String("channel", channelName), zap.String("sub name", subName)) + log.Info("QueryNode AsConsumer", zap.String("channel", channelName), zap.String("sub name", subName)) } else { err = qc.queryMsgStream.Seek([]*internalpb.MsgPosition{position}) if err == nil { - log.Debug("querynode seek query channel: ", zap.Any("consumeChannel", channelName), + log.Info("querynode seek query channel: ", zap.Any("consumeChannel", channelName), zap.String("seek position", string(position.MsgID))) } } diff --git a/internal/querynode/query_collection.go b/internal/querynode/query_collection.go index 5e8a059464..76b9eae71e 100644 --- a/internal/querynode/query_collection.go +++ b/internal/querynode/query_collection.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "math" + "reflect" "sync" "time" @@ -181,7 +182,7 @@ func (q *queryCollection) registerCollectionTSafe() error { return err } } - log.Debug("register tSafe watcher and init watcher select case", + log.Info("register tSafe watcher and init watcher select case", zap.Any("collectionID", streamingCollection.ID()), zap.Any("dml channels", streamingCollection.getVChannels())) @@ -195,7 +196,7 @@ func (q *queryCollection) registerCollectionTSafe() error { return err } } - log.Debug("register tSafe watcher and init watcher select case", + log.Info("register tSafe watcher and init watcher select case", zap.Any("collectionID", historicalCollection.ID()), zap.Any("delta channels", historicalCollection.getVDeltaChannels())) @@ -215,9 +216,10 @@ func (q *queryCollection) addTSafeWatcher(vChannel Channel) error { q.tSafeWatchers[vChannel] = newTSafeWatcher() err := q.streaming.tSafeReplica.registerTSafeWatcher(vChannel, q.tSafeWatchers[vChannel]) if err != nil { + log.Warn("failed to register tsafe watcher", zap.Error(err)) return err } - log.Debug("add tSafeWatcher to queryCollection", + log.Info("add tSafeWatcher to queryCollection", zap.Any("collectionID", q.collectionID), zap.Any("channel", vChannel), ) @@ -236,7 +238,7 @@ func (q *queryCollection) removeTSafeWatcher(channel Channel) error { } q.tSafeWatchers[channel].close() delete(q.tSafeWatchers, channel) - log.Debug("remove tSafeWatcher from queryCollection", + log.Info("remove tSafeWatcher from queryCollection", zap.Any("collectionID", q.collectionID), zap.Any("channel", channel), ) @@ -247,10 +249,10 @@ func (q *queryCollection) startWatcher(channel <-chan bool, closeCh <-chan struc for { select { case <-q.releaseCtx.Done(): - log.Debug("stop queryCollection watcher because queryCollection ctx done", zap.Any("collectionID", q.collectionID)) + log.Info("stop queryCollection watcher because queryCollection ctx done", zap.Any("collectionID", q.collectionID)) return case <-closeCh: - log.Debug("stop queryCollection watcher because watcher closed", zap.Any("collectionID", q.collectionID)) + log.Info("stop queryCollection watcher because watcher closed", zap.Any("collectionID", q.collectionID)) return case <-channel: // TODO: check if channel is closed @@ -352,7 +354,7 @@ func (q *queryCollection) consumeQuery() { for { select { case <-q.releaseCtx.Done(): - log.Debug("stop queryCollection's receiveQueryMsg", zap.Int64("collectionID", q.collectionID)) + log.Info("stop queryCollection's receiveQueryMsg", zap.Int64("collectionID", q.collectionID)) return case msgPack, ok := <-q.queryMsgStream.Chan(): if !ok { @@ -378,7 +380,7 @@ func (q *queryCollection) consumeQuery() { case *msgstream.SealedSegmentsChangeInfoMsg: q.adjustByChangeInfo(sm) default: - log.Warn("unsupported msg type in search channel", zap.Int64("msgID", sm.ID())) + log.Warn("unsupported msg type in search channel", zap.Int64("msgID", sm.ID()), zap.String("name", reflect.TypeOf(msg).Name())) } } } @@ -446,17 +448,9 @@ func (q *queryCollection) receiveQueryMsg(msg queryMsg) error { case commonpb.MsgType_Retrieve: collectionID = msg.(*msgstream.RetrieveMsg).CollectionID msgTypeStr = "retrieve" - //log.Debug("consume retrieve message", - // zap.Any("collectionID", collectionID), - // zap.Int64("msgID", msg.ID()), - //) case commonpb.MsgType_Search: collectionID = msg.(*msgstream.SearchMsg).CollectionID msgTypeStr = "search" - //log.Debug("consume search message", - // zap.Any("collectionID", collectionID), - // zap.Int64("msgID", msg.ID()), - //) default: err := fmt.Errorf("receive invalid msgType = %d", msgType) return err @@ -493,7 +487,7 @@ func (q *queryCollection) receiveQueryMsg(msg queryMsg) error { finalErr := fmt.Errorf("first err = %s, second err = %s", err, publishErr) return finalErr } - log.Debug("do query failed in receiveQueryMsg, publish failed query result", + log.Warn("do query failed in receiveQueryMsg, publish failed query result", zap.Int64("collectionID", collectionID), zap.Int64("msgID", msg.ID()), zap.String("msgType", msgTypeStr), @@ -508,7 +502,7 @@ func (q *queryCollection) receiveQueryMsg(msg queryMsg) error { finalErr := fmt.Errorf("first err = %s, second err = %s", err, publishErr) return finalErr } - log.Debug("do query failed in receiveQueryMsg, publish failed query result", + log.Warn("do query failed in receiveQueryMsg, publish failed query result", zap.Int64("collectionID", collectionID), zap.Int64("msgID", msg.ID()), zap.String("msgType", msgTypeStr), @@ -588,7 +582,7 @@ func (q *queryCollection) doUnsolvedQueryMsg() { for { select { case <-q.releaseCtx.Done(): - log.Debug("stop Collection's doUnsolvedMsg", zap.Int64("collectionID", q.collectionID)) + log.Info("stop Collection's doUnsolvedMsg", zap.Int64("collectionID", q.collectionID)) return default: //time.Sleep(10 * time.Millisecond) @@ -628,10 +622,10 @@ func (q *queryCollection) doUnsolvedQueryMsg() { err := errors.New(fmt.Sprintln("do query failed in doUnsolvedQueryMsg because timeout"+ ", collectionID = ", q.collectionID, ", msgID = ", m.ID())) - log.Warn(err.Error()) + log.Warn("query timeout", zap.Error(err)) publishErr := q.publishFailedQueryResult(m, err.Error()) if publishErr != nil { - log.Error(publishErr.Error()) + log.Warn("failed to publish failed result", zap.Error(publishErr)) } continue } @@ -678,24 +672,22 @@ func (q *queryCollection) doUnsolvedQueryMsg() { } if err != nil { - log.Warn(err.Error()) + log.Debug("do query failed in doUnsolvedMsg, publish failed query result", + zap.Int64("collectionID", q.collectionID), + zap.Int64("msgID", m.ID()), + ) err = q.publishFailedQueryResult(m, err.Error()) if err != nil { - log.Warn(err.Error()) - } else { - log.Debug("do query failed in doUnsolvedMsg, publish failed query result", - zap.Int64("collectionID", q.collectionID), - zap.Int64("msgID", m.ID()), - ) + log.Warn("failed to publish failed result", zap.Error(err)) } + } else { + log.Debug("do query done in doUnsolvedMsg", + zap.Int64("collectionID", q.collectionID), + zap.Int64("msgID", m.ID()), + ) } sp.Finish() - log.Debug("do query done in doUnsolvedMsg", - zap.Int64("collectionID", q.collectionID), - zap.Int64("msgID", m.ID()), - ) } - log.Debug("doUnsolvedMsg: do query done", zap.Int("num of query msg", len(unSolvedMsg))) } } } @@ -854,7 +846,6 @@ func (q *queryCollection) search(msg queryMsg) error { log.Debug("QueryNode reduce data finished", zap.Int64("msgID", searchMsg.ID())) sp.LogFields(oplog.String("statistical time", "reduceSearchResults end")) if err != nil { - log.Error("QueryNode reduce data failed", zap.Int64("msgID", searchMsg.ID()), zap.Error(err)) return err } nqOfReqs := []int64{nq} @@ -1160,69 +1151,3 @@ func (q *queryCollection) publishFailedQueryResultWithCtx(ctx context.Context, m func (q *queryCollection) publishFailedQueryResult(msg msgstream.TsMsg, errMsg string) error { return q.publishFailedQueryResultWithCtx(q.releaseCtx, msg, errMsg) } - -// func (q *queryCollection) publishQueryResult(msg msgstream.TsMsg, collectionID UniqueID) error { -// span, ctx := trace.StartSpanFromContext(msg.TraceCtx()) -// defer span.Finish() -// msg.SetTraceCtx(ctx) -// msgPack := msgstream.MsgPack{} -// msgPack.Msgs = append(msgPack.Msgs, msg) -// err := q.queryResultMsgStream.Produce(&msgPack) -// if err != nil { -// log.Error(err.Error()) -// } -// -// return err -// } - -// func (q *queryCollection) publishFailedQueryResult(msg msgstream.TsMsg, errMsg string) error { -// msgType := msg.Type() -// span, ctx := trace.StartSpanFromContext(msg.TraceCtx()) -// defer span.Finish() -// msg.SetTraceCtx(ctx) -// msgPack := msgstream.MsgPack{} -// -// resultChannelInt := 0 -// baseMsg := msgstream.BaseMsg{ -// HashValues: []uint32{uint32(resultChannelInt)}, -// } -// baseResult := &commonpb.MsgBase{ -// MsgID: msg.ID(), -// Timestamp: msg.BeginTs(), -// SourceID: msg.SourceID(), -// } -// -// switch msgType { -// case commonpb.MsgType_Retrieve: -// retrieveMsg := msg.(*msgstream.RetrieveMsg) -// baseResult.MsgType = commonpb.MsgType_RetrieveResult -// retrieveResultMsg := &msgstream.RetrieveResultMsg{ -// BaseMsg: baseMsg, -// RetrieveResults: internalpb.RetrieveResults{ -// Base: baseResult, -// Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: errMsg}, -// ResultChannelID: retrieveMsg.ResultChannelID, -// Ids: nil, -// FieldsData: nil, -// }, -// } -// msgPack.Msgs = append(msgPack.Msgs, retrieveResultMsg) -// case commonpb.MsgType_Search: -// searchMsg := msg.(*msgstream.SearchMsg) -// baseResult.MsgType = commonpb.MsgType_SearchResult -// searchResultMsg := &msgstream.SearchResultMsg{ -// BaseMsg: baseMsg, -// SearchResults: internalpb.SearchResults{ -// Base: baseResult, -// Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: errMsg}, -// ResultChannelID: searchMsg.ResultChannelID, -// }, -// } -// msgPack.Msgs = append(msgPack.Msgs, searchResultMsg) -// default: -// return fmt.Errorf("publish invalid msgType %d", msgType) -// } -// -// return q.queryResultMsgStream.Produce(&msgPack) -// } -// diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 94f2147b0f..c8006d7362 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -149,7 +149,7 @@ func (node *QueryNode) initSession() error { node.session.Init(typeutil.QueryNodeRole, Params.QueryNodeCfg.QueryNodeIP+":"+strconv.FormatInt(Params.QueryNodeCfg.QueryNodePort, 10), false, true) Params.QueryNodeCfg.SetNodeID(node.session.ServerID) Params.SetLogger(Params.QueryNodeCfg.GetNodeID()) - log.Debug("QueryNode", zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()), zap.String("node address", node.session.Address)) + log.Info("QueryNode init session", zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()), zap.String("node address", node.session.Address)) return nil } @@ -213,7 +213,7 @@ func (node *QueryNode) initServiceDiscovery() error { log.Warn("QueryNode failed to init service discovery", zap.Error(err)) return err } - log.Debug("QueryNode success to get Proxy sessions", zap.Any("sessions", sessions)) + log.Info("QueryNode success to get Proxy sessions", zap.Any("sessions", sessions)) nodes := make([]*NodeInfo, 0, len(sessions)) for _, session := range sessions { @@ -235,7 +235,7 @@ func (node *QueryNode) watchService(ctx context.Context) { for { select { case <-ctx.Done(): - log.Debug("watch service shutdown") + log.Info("watch service shutdown") return case event, ok := <-node.eventCh: if !ok { @@ -250,17 +250,12 @@ func (node *QueryNode) watchService(ctx context.Context) { } return } - if err := node.handleSessionEvent(ctx, event); err != nil { - log.Warn("handleSessionEvent", zap.Error(err)) - } + node.handleSessionEvent(ctx, event) } } } -func (node *QueryNode) handleSessionEvent(ctx context.Context, event *sessionutil.SessionEvent) error { - if event == nil { - return nil - } +func (node *QueryNode) handleSessionEvent(ctx context.Context, event *sessionutil.SessionEvent) { info := &NodeInfo{ NodeID: event.Session.ServerID, Address: event.Session.Address, @@ -274,7 +269,6 @@ func (node *QueryNode) handleSessionEvent(ctx context.Context, event *sessionuti log.Warn("receive unknown service event type", zap.Any("type", event.EventType)) } - return nil } // Init function init historical and streaming module to manage segments @@ -282,7 +276,7 @@ func (node *QueryNode) Init() error { var initError error = nil node.initOnce.Do(func() { //ctx := context.Background() - log.Debug("QueryNode session info", zap.String("metaPath", Params.EtcdCfg.MetaRootPath)) + log.Info("QueryNode session info", zap.String("metaPath", Params.EtcdCfg.MetaRootPath)) err := node.initSession() if err != nil { log.Error("QueryNode init session failed", zap.Error(err)) @@ -307,7 +301,7 @@ func (node *QueryNode) Init() error { } node.etcdKV = etcdkv.NewEtcdKV(node.etcdCli, Params.EtcdCfg.MetaRootPath) - log.Debug("queryNode try to connect etcd success", zap.Any("MetaRootPath", Params.EtcdCfg.MetaRootPath)) + log.Info("queryNode try to connect etcd success", zap.Any("MetaRootPath", Params.EtcdCfg.MetaRootPath)) node.tSafeReplica = newTSafeReplica() streamingReplica := newCollectionReplica(node.etcdKV) @@ -349,7 +343,7 @@ func (node *QueryNode) Init() error { // node.factory, // qsOptWithSessionManager(node.sessionManager)) - log.Debug("query node init successfully", + log.Info("query node init successfully", zap.Any("queryNodeID", Params.QueryNodeCfg.GetNodeID()), zap.Any("IP", Params.QueryNodeCfg.QueryNodeIP), zap.Any("Port", Params.QueryNodeCfg.QueryNodePort), @@ -385,7 +379,7 @@ func (node *QueryNode) Start() error { Params.QueryNodeCfg.UpdatedTime = time.Now() node.UpdateStateCode(internalpb.StateCode_Healthy) - log.Debug("query node start successfully", + log.Info("query node start successfully", zap.Any("queryNodeID", Params.QueryNodeCfg.GetNodeID()), zap.Any("IP", Params.QueryNodeCfg.QueryNodeIP), zap.Any("Port", Params.QueryNodeCfg.QueryNodePort), @@ -395,6 +389,7 @@ func (node *QueryNode) Start() error { // Stop mainly stop QueryNode's query service, historical loop and streaming loop. func (node *QueryNode) Stop() error { + log.Warn("Query node stop..") node.UpdateStateCode(internalpb.StateCode_Abnormal) node.queryNodeLoopCancel() @@ -435,12 +430,12 @@ func (node *QueryNode) SetEtcdClient(client *clientv3.Client) { } func (node *QueryNode) watchChangeInfo() { - log.Debug("query node watchChangeInfo start") + log.Info("query node watchChangeInfo start") watchChan := node.etcdKV.WatchWithPrefix(util.ChangeInfoMetaPrefix) for { select { case <-node.queryNodeLoopCtx.Done(): - log.Debug("query node watchChangeInfo close") + log.Info("query node watchChangeInfo close") return case resp := <-watchChan: for _, event := range resp.Events { @@ -451,7 +446,7 @@ func (node *QueryNode) watchChangeInfo() { log.Warn("Parse SealedSegmentsChangeInfo id failed", zap.Any("error", err.Error())) continue } - log.Debug("get SealedSegmentsChangeInfo from etcd", + log.Info("get SealedSegmentsChangeInfo from etcd", zap.Any("infoID", infoID), ) info := &querypb.SealedSegmentsChangeInfo{} @@ -525,7 +520,7 @@ func (node *QueryNode) removeSegments(segmentChangeInfos *querypb.SealedSegments if err != nil { return err } - log.Debug("remove growing segment in removeSegments", + log.Info("remove growing segment in removeSegments", zap.Any("collectionID", segmentInfo.CollectionID), zap.Any("segmentID", segmentInfo.SegmentID), zap.Any("infoID", segmentChangeInfos.Base.GetMsgID()), @@ -541,7 +536,7 @@ func (node *QueryNode) removeSegments(segmentChangeInfos *querypb.SealedSegments if err != nil { return err } - log.Debug("remove sealed segment", zap.Any("collectionID", segmentInfo.CollectionID), + log.Info("remove sealed segment", zap.Any("collectionID", segmentInfo.CollectionID), zap.Any("segmentID", segmentInfo.SegmentID), zap.Any("infoID", segmentChangeInfos.Base.GetMsgID()), ) diff --git a/internal/querynode/query_service.go b/internal/querynode/query_service.go new file mode 100644 index 0000000000..2055a2d65c --- /dev/null +++ b/internal/querynode/query_service.go @@ -0,0 +1,160 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package querynode + +import "C" +import ( + "context" + "errors" + "fmt" + "sync" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/mq/msgstream" + "github.com/milvus-io/milvus/internal/storage" +) + +type queryService struct { + ctx context.Context + cancel context.CancelFunc + + historical *historical + streaming *streaming + + queryCollectionMu sync.Mutex // guards queryCollections + queryCollections map[UniqueID]*queryCollection + + factory msgstream.Factory + + sessionManager *SessionManager + + cacheStorage storage.ChunkManager + vectorStorage storage.ChunkManager +} + +type qsOpt func(*queryService) + +func qsOptWithSessionManager(s *SessionManager) qsOpt { + return func(qs *queryService) { + qs.sessionManager = s + } +} + +func newQueryService(ctx context.Context, + historical *historical, + streaming *streaming, + vectorStorage storage.ChunkManager, + cacheStorage storage.ChunkManager, + factory msgstream.Factory, + opts ...qsOpt, +) *queryService { + + queryServiceCtx, queryServiceCancel := context.WithCancel(ctx) + + qs := &queryService{ + ctx: queryServiceCtx, + cancel: queryServiceCancel, + + historical: historical, + streaming: streaming, + + queryCollections: make(map[UniqueID]*queryCollection), + + vectorStorage: vectorStorage, + cacheStorage: cacheStorage, + factory: factory, + } + + for _, opt := range opts { + opt(qs) + } + + return qs +} + +func (q *queryService) close() { + log.Info("search service closed") + q.queryCollectionMu.Lock() + for collectionID, sc := range q.queryCollections { + sc.close() + sc.cancel() + delete(q.queryCollections, collectionID) + } + q.queryCollections = make(map[UniqueID]*queryCollection) + q.queryCollectionMu.Unlock() + q.cancel() +} + +func (q *queryService) addQueryCollection(collectionID UniqueID) error { + q.queryCollectionMu.Lock() + defer q.queryCollectionMu.Unlock() + if _, ok := q.queryCollections[collectionID]; ok { + log.Warn("query collection already exists", zap.Any("collectionID", collectionID)) + err := errors.New(fmt.Sprintln("query collection already exists, collectionID = ", collectionID)) + return err + } + ctx1, cancel := context.WithCancel(q.ctx) + qc, err := newQueryCollection(ctx1, + cancel, + collectionID, + q.historical, + q.streaming, + q.factory, + q.cacheStorage, + q.vectorStorage, + qcOptWithSessionManager(q.sessionManager), + ) + if err != nil { + return err + } + q.queryCollections[collectionID] = qc + return nil +} + +func (q *queryService) hasQueryCollection(collectionID UniqueID) bool { + q.queryCollectionMu.Lock() + defer q.queryCollectionMu.Unlock() + _, ok := q.queryCollections[collectionID] + return ok +} + +func (q *queryService) getQueryCollection(collectionID UniqueID) (*queryCollection, error) { + q.queryCollectionMu.Lock() + defer q.queryCollectionMu.Unlock() + _, ok := q.queryCollections[collectionID] + if ok { + return q.queryCollections[collectionID], nil + } + return nil, errors.New(fmt.Sprintln("queryCollection not exists, collectionID = ", collectionID)) +} + +func (q *queryService) stopQueryCollection(collectionID UniqueID) { + q.queryCollectionMu.Lock() + defer q.queryCollectionMu.Unlock() + sc, ok := q.queryCollections[collectionID] + if !ok { + log.Warn("stopQueryCollection failed, collection doesn't exist", zap.Int64("collectionID", collectionID)) + return + } + sc.close() + sc.cancel() + // for not blocking waitNewTsafe, which will block doUnsolvedMsg quit. + sc.watcherCond.Broadcast() + delete(q.queryCollections, collectionID) +} diff --git a/internal/querynode/query_shard.go b/internal/querynode/query_shard.go index 71d797a9b8..e477669503 100644 --- a/internal/querynode/query_shard.go +++ b/internal/querynode/query_shard.go @@ -179,10 +179,10 @@ func (q *queryShard) watchTs(channel <-chan bool, closeCh <-chan struct{}, tp ts for { select { case <-q.ctx.Done(): - log.Debug("stop queryShard watcher due to ctx done", zap.Int64("collectionID", q.collectionID), zap.String("vChannel", q.channel)) + log.Info("stop queryShard watcher due to ctx done", zap.Int64("collectionID", q.collectionID), zap.String("vChannel", q.channel)) return case <-closeCh: - log.Debug("stop queryShard watcher due to watcher closed", zap.Int64("collectionID", q.collectionID), zap.String("vChannel", q.channel)) + log.Info("stop queryShard watcher due to watcher closed", zap.Int64("collectionID", q.collectionID), zap.String("vChannel", q.channel)) return case _, ok := <-channel: if !ok { @@ -458,7 +458,6 @@ func (q *queryShard) searchLeader(ctx context.Context, req *querypb.SearchReques } // reduce shard search results: unmarshal -> reduce -> marshal - log.Debug("shard leader get search results", zap.Int("numbers", len(results))) searchResultData, err := decodeSearchResults(results) if err != nil { log.Warn("shard leader decode search results errors", zap.Error(err)) diff --git a/internal/querynode/query_shard_service.go b/internal/querynode/query_shard_service.go index 37f2d0b042..0037b27527 100644 --- a/internal/querynode/query_shard_service.go +++ b/internal/querynode/query_shard_service.go @@ -23,8 +23,10 @@ import ( "strconv" "sync" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/dependency" + "go.uber.org/zap" ) type queryShardService struct { @@ -92,6 +94,7 @@ func (q *queryShardService) addQueryShard(collectionID UniqueID, channel Channel q.localCacheEnabled, ) q.queryShards[channel] = qs + log.Info("Successfully add query shard", zap.Int64("collection", collectionID), zap.Int64("replica", replicaID), zap.String("channel", channel)) return nil } @@ -102,6 +105,7 @@ func (q *queryShardService) removeQueryShard(channel Channel) error { return errors.New(fmt.Sprintln("query shard(channel) ", channel, " does not exist")) } delete(q.queryShards, channel) + log.Info("Successfully remove query shard", zap.String("channel", channel)) return nil } @@ -122,6 +126,7 @@ func (q *queryShardService) getQueryShard(channel Channel) (*queryShard, error) } func (q *queryShardService) close() { + log.Warn("Close query shard service") q.cancel() q.queryShardsMu.Lock() defer q.queryShardsMu.Unlock() @@ -161,4 +166,5 @@ func (q *queryShardService) releaseCollection(collectionID int64) { } } q.queryShardsMu.Unlock() + log.Info("release collection in query shard service", zap.Int64("collectionId", collectionID)) } diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index d5beb0895f..3497268337 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -199,7 +199,7 @@ func newSegment(collection *Collection, segmentID UniqueID, partitionID UniqueID return nil, err } - log.Debug("create segment", + log.Info("create segment", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Int64("segmentID", segmentID), @@ -236,7 +236,7 @@ func deleteSegment(segment *Segment) { C.DeleteSegment(cPtr) segment.segmentPtr = nil - log.Debug("delete segment from memory", zap.Int64("collectionID", segment.collectionID), zap.Int64("partitionID", segment.partitionID), zap.Int64("segmentID", segment.ID())) + log.Info("delete segment from memory", zap.Int64("collectionID", segment.collectionID), zap.Int64("partitionID", segment.partitionID), zap.Int64("segmentID", segment.ID())) segment = nil } @@ -345,6 +345,7 @@ func (s *Segment) retrieve(plan *RetrievePlan) (*segcorepb.RetrieveResults, erro status := C.Retrieve(s.segmentPtr, plan.cRetrievePlan, ts, &retrieveResult.cRetrieveResult) metrics.QueryNodeSQSegmentLatencyInCore.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.QueryLabel).Observe(float64(tr.ElapseSpan().Milliseconds())) + log.Debug("do retrieve on segment", zap.Int64("segmentID", s.segmentID), zap.Int32("segmentType", int32(s.segmentType))) if err := HandleCStatus(&status, "Retrieve failed"); err != nil { return nil, err } @@ -570,7 +571,7 @@ func (s *Segment) updateBloomFilter(pks []primaryKey) { stringValue := pk.(*varCharPrimaryKey).Value s.pkFilter.AddString(stringValue) default: - //TODO:: + log.Warn("failed to update bloomfilter", zap.Any("PK type", pk.Type())) } } } @@ -759,7 +760,7 @@ func (s *Segment) segmentLoadFieldData(fieldID int64, rowCount int64, data *sche return err } - log.Debug("load field done", + log.Info("load field done", zap.Int64("fieldID", fieldID), zap.Int64("row count", rowCount), zap.Int64("segmentID", s.ID())) @@ -827,7 +828,7 @@ func (s *Segment) segmentLoadDeletedRecord(primaryKeys []primaryKey, timestamps return err } - log.Debug("load deleted record done", + log.Info("load deleted record done", zap.Int64("row count", rowCount), zap.Int64("segmentID", s.ID())) return nil @@ -861,7 +862,7 @@ func (s *Segment) segmentLoadIndexData(bytesIndex [][]byte, indexInfo *querypb.F return err } - log.Debug("updateSegmentIndex done", zap.Int64("segmentID", s.ID())) + log.Info("updateSegmentIndex done", zap.Int64("segmentID", s.ID())) return nil } diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index b8857d6f9e..3408689bec 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -24,10 +24,6 @@ import ( "runtime" "strconv" "sync" - "time" - - "github.com/panjf2000/ants/v2" - "go.uber.org/zap" "github.com/milvus-io/milvus/internal/common" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" @@ -45,10 +41,10 @@ import ( "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/timerecord" + "github.com/panjf2000/ants/v2" + "go.uber.org/zap" ) -const timeoutForEachRead = 10 * time.Second - // segmentLoader is only responsible for loading the field data from binlog type segmentLoader struct { historicalReplica ReplicaInterface @@ -113,9 +109,9 @@ func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest, segme return err } - log.Debug("segmentLoader start loading...", - zap.Int64("collectionID", req.CollectionID), - zap.Int("numOfSegments", len(req.Infos)), + log.Info("segmentLoader start loading...", + zap.Any("collectionID", req.CollectionID), + zap.Any("numOfSegments", len(req.Infos)), zap.Any("loadType", segmentType), ) // check memory limit @@ -230,7 +226,7 @@ func (loader *segmentLoader) loadSegmentInternal(segment *Segment, collectionID := loadInfo.CollectionID partitionID := loadInfo.PartitionID segmentID := loadInfo.SegmentID - log.Debug("start loading segment data into memory", + log.Info("start loading segment data into memory", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Int64("segmentID", segmentID)) @@ -283,7 +279,7 @@ func (loader *segmentLoader) loadSegmentInternal(segment *Segment, if pkFieldID == common.InvalidFieldID { log.Warn("segment primary key field doesn't exist when load segment") } else { - log.Debug("loading bloom filter...") + log.Debug("loading bloom filter...", zap.Int64("segmentID", segmentID)) pkStatsBinlogs := loader.filterPKStatsBinlogs(loadInfo.Statslogs, pkFieldID) err = loader.loadSegmentBloomFilter(segment, pkStatsBinlogs) if err != nil { @@ -291,7 +287,7 @@ func (loader *segmentLoader) loadSegmentInternal(segment *Segment, } } - log.Debug("loading delta...") + log.Debug("loading delta...", zap.Int64("segmentID", segmentID)) err = loader.loadDeltaLogs(segment, loadInfo.Deltalogs) return err } @@ -476,7 +472,7 @@ func (loader *segmentLoader) loadGrowingSegments(segment *Segment, return errors.New(fmt.Sprintln("illegal insert data when load segment, collectionID = ", segment.collectionID)) } - log.Debug("start load growing segments...", + log.Info("start load growing segments...", zap.Any("collectionID", segment.collectionID), zap.Any("segmentID", segment.ID()), zap.Any("numRows", len(ids)), @@ -516,7 +512,7 @@ func (loader *segmentLoader) loadGrowingSegments(segment *Segment, if err != nil { return err } - log.Debug("Do insert done in segment loader", zap.Int("len", numOfRecords), zap.Int64("segmentID", segment.ID()), zap.Int64("collectionID", segment.collectionID)) + log.Info("Do insert done fro growing segment ", zap.Int("len", numOfRecords), zap.Int64("segmentID", segment.ID()), zap.Int64("collectionID", segment.collectionID)) return nil } @@ -608,7 +604,7 @@ func (loader *segmentLoader) loadDeltaLogs(segment *Segment, deltaLogs []*datapb } func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collectionID int64, position *internalpb.MsgPosition) error { - log.Debug("from dml check point load delete", zap.Any("position", position), zap.Any("msg id", position.MsgID)) + log.Info("from dml check point load delete", zap.Any("position", position), zap.Any("msg id", position.MsgID)) stream, err := loader.factory.NewMsgStream(ctx) if err != nil { return err @@ -629,7 +625,7 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection } if lastMsgID.AtEarliestPosition() { - log.Debug("there is no more delta msg", zap.Int64("Collection ID", collectionID), zap.String("channel", pChannelName)) + log.Info("there is no more delta msg", zap.Int64("Collection ID", collectionID), zap.String("channel", pChannelName)) return nil } @@ -646,7 +642,7 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection deleteOffset: make(map[UniqueID]int64), } - log.Debug("start read delta msg from seek position to last position", + log.Info("start read delta msg from seek position to last position", zap.Int64("Collection ID", collectionID), zap.String("channel", pChannelName)) hasMore := true for hasMore { @@ -692,12 +688,12 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection } } - log.Debug("All data has been read, there is no more data", zap.Int64("Collection ID", collectionID), + log.Info("All data has been read, there is no more data", zap.Int64("Collection ID", collectionID), zap.String("channel", pChannelName), zap.Any("msg id", position.GetMsgID())) for segmentID, pks := range delData.deleteIDs { segment, err := loader.historicalReplica.getSegmentByID(segmentID) if err != nil { - log.Debug(err.Error()) + log.Warn(err.Error()) continue } offset := segment.segmentPreDelete(len(pks)) @@ -710,7 +706,7 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection go deletePk(loader.historicalReplica, delData, segmentID, &wg) } wg.Wait() - log.Debug("from dml check point load done", zap.Any("msg id", position.GetMsgID())) + log.Info("from dml check point load done", zap.Any("msg id", position.GetMsgID())) return nil } diff --git a/internal/querynode/shard_cluster.go b/internal/querynode/shard_cluster.go index 618b0c5268..312433cb56 100644 --- a/internal/querynode/shard_cluster.go +++ b/internal/querynode/shard_cluster.go @@ -168,6 +168,7 @@ func NewShardCluster(collectionID int64, replicaID int64, vchannelName string, } func (sc *ShardCluster) Close() { + log.Info("Close shard cluster") sc.closeOnce.Do(func() { sc.state.Store(int32(unavailable)) close(sc.closeCh) @@ -176,7 +177,7 @@ func (sc *ShardCluster) Close() { // addNode add a node into cluster func (sc *ShardCluster) addNode(evt nodeEvent) { - log.Debug("ShardCluster add node", zap.Int64("nodeID", evt.nodeID)) + log.Info("ShardCluster add node", zap.Int64("nodeID", evt.nodeID)) sc.mut.Lock() defer sc.mut.Unlock() @@ -198,6 +199,7 @@ func (sc *ShardCluster) addNode(evt nodeEvent) { // removeNode handles node offline and setup related segments func (sc *ShardCluster) removeNode(evt nodeEvent) { + log.Info("ShardCluster remove node", zap.Int64("nodeID", evt.nodeID)) sc.mut.Lock() defer sc.mut.Unlock() @@ -220,8 +222,7 @@ func (sc *ShardCluster) removeNode(evt nodeEvent) { // updateSegment apply segment change to shard cluster func (sc *ShardCluster) updateSegment(evt segmentEvent) { - log.Debug("ShardCluster update segment", zap.Int64("nodeID", evt.nodeID), zap.Int64("segmentID", evt.segmentID), zap.Int32("state", int32(evt.state))) - + log.Info("ShardCluster update segment", zap.Int64("nodeID", evt.nodeID), zap.Int64("segmentID", evt.segmentID), zap.Int32("state", int32(evt.state))) // notify handoff wait online if any defer func() { sc.segmentCond.L.Lock() @@ -248,6 +249,7 @@ func (sc *ShardCluster) updateSegment(evt segmentEvent) { // SyncSegments synchronize segment distribution in batch func (sc *ShardCluster) SyncSegments(distribution []*querypb.ReplicaSegmentsInfo, state segmentState) { + log.Info("ShardCluster sync segments", zap.Any("replica segments", distribution), zap.Int32("state", int32(state))) // notify handoff wait online if any defer func() { sc.segmentCond.L.Lock() @@ -312,7 +314,7 @@ func (sc *ShardCluster) transferSegment(old *shardSegmentInfo, evt segmentEvent) // removeSegment removes segment from cluster // should only applied in hand-off or load balance procedure func (sc *ShardCluster) removeSegment(evt segmentEvent) { - log.Debug("ShardCluster remove segment", zap.Int64("nodeID", evt.nodeID), zap.Int64("segmentID", evt.segmentID), zap.Int32("state", int32(evt.state))) + log.Info("ShardCluster remove segment", zap.Int64("nodeID", evt.nodeID), zap.Int64("segmentID", evt.segmentID), zap.Int32("state", int32(evt.state))) sc.mut.Lock() defer sc.mut.Unlock() @@ -680,6 +682,7 @@ func (sc *ShardCluster) Search(ctx context.Context, req *querypb.SearchRequest) wg.Wait() if err != nil { + log.Error(err.Error()) return nil, err } @@ -735,6 +738,7 @@ func (sc *ShardCluster) Query(ctx context.Context, req *querypb.QueryRequest) ([ wg.Wait() if err != nil { + log.Error(err.Error()) return nil, err } diff --git a/internal/querynode/shard_cluster_service.go b/internal/querynode/shard_cluster_service.go index a36c3dc9dc..1fe05401fc 100644 --- a/internal/querynode/shard_cluster_service.go +++ b/internal/querynode/shard_cluster_service.go @@ -8,11 +8,13 @@ import ( "sync" grpcquerynodeclient "github.com/milvus-io/milvus/internal/distributed/querynode/client" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/util" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/typeutil" clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" ) const ( @@ -75,6 +77,7 @@ func (s *ShardClusterService) addShardCluster(collectionID, replicaID int64, vch }) s.clusters.Store(vchannelName, cs) + log.Info("successfully add shard cluster", zap.Int64("collectionID", collectionID), zap.Int64("replica", replicaID), zap.String("vchan", vchannelName)) } // getShardCluster gets shardCluster of specified vchannel if exists. @@ -107,6 +110,7 @@ func (s *ShardClusterService) releaseCollection(collectionID int64) { } return true }) + log.Info("successfully release collection", zap.Int64("collectionID", collectionID)) } // HandoffSegments dispatch segmentChangeInfo to related shardClusters @@ -124,6 +128,7 @@ func (s *ShardClusterService) HandoffSegments(collectionID int64, info *querypb. return true }) wg.Wait() + log.Info("successfully handoff segments", zap.Int64("collectionID", collectionID)) } // SyncReplicaSegments dispatches nodeID segments distribution to ShardCluster. @@ -134,7 +139,7 @@ func (s *ShardClusterService) SyncReplicaSegments(vchannelName string, distribut } sc.SyncSegments(distribution, segmentStateLoaded) - + log.Info("successfully sync segments", zap.String("channel", vchannelName), zap.Any("distribution", distribution)) return nil } @@ -146,5 +151,11 @@ func (s *ShardClusterService) HandoffVChannelSegments(vchannel string, info *que return nil } sc := raw.(*ShardCluster) - return sc.HandoffSegments(info) + err := sc.HandoffSegments(info) + if err != nil { + log.Info("successfully handoff ", zap.String("channel", vchannel), zap.Any("segment", info)) + } else { + log.Warn("failed to handoff", zap.String("channel", vchannel), zap.Any("segment", info), zap.Error(err)) + } + return err } diff --git a/internal/querynode/shard_node_detector.go b/internal/querynode/shard_node_detector.go index d9826fbd52..7030ad6efe 100644 --- a/internal/querynode/shard_node_detector.go +++ b/internal/querynode/shard_node_detector.go @@ -65,7 +65,7 @@ func (nd *etcdShardNodeDetector) Close() { // watchNodes lists current online nodes and returns a channel for incoming events. func (nd *etcdShardNodeDetector) watchNodes(collectionID int64, replicaID int64, vchannelName string) ([]nodeEvent, <-chan nodeEvent) { - log.Debug("nodeDetector watch", zap.Int64("collectionID", collectionID), zap.Int64("replicaID", replicaID), zap.String("vchannelName", vchannelName)) + log.Info("nodeDetector watch", zap.Int64("collectionID", collectionID), zap.Int64("replicaID", replicaID), zap.String("vchannelName", vchannelName)) resp, err := nd.client.Get(context.Background(), nd.path, clientv3.WithPrefix()) if err != nil { log.Warn("Etcd NodeDetector get replica info failed", zap.Error(err)) @@ -74,7 +74,7 @@ func (nd *etcdShardNodeDetector) watchNodes(collectionID int64, replicaID int64, idAddr, err := nd.idAddr() if err != nil { - log.Warn("Etcd NodeDetector session map failed", zap.Error(err)) + log.Error("Etcd NodeDetector session map failed", zap.Error(err)) panic(err) } @@ -121,7 +121,6 @@ func (nd *etcdShardNodeDetector) cancelClose(cancel func()) { } func (nd *etcdShardNodeDetector) watch(ch clientv3.WatchChan, collectionID, replicaID int64) { - log.Debug("etcdNodeDetector start watch") defer nd.wg.Done() for { select { @@ -171,7 +170,7 @@ func (nd *etcdShardNodeDetector) handlePutEvent(e *clientv3.Event, collectionID, idAddr, err := nd.idAddr() if err != nil { - log.Warn("Etcd NodeDetector session map failed", zap.Error(err)) + log.Error("Etcd NodeDetector session map failed", zap.Error(err)) panic(err) } // all node is added @@ -247,7 +246,7 @@ func (nd *etcdShardNodeDetector) handleDelEvent(e *clientv3.Event, collectionID, } idAddr, err := nd.idAddr() if err != nil { - log.Warn("Etcd NodeDetector session map failed", zap.Error(err)) + log.Error("Etcd NodeDetector session map failed", zap.Error(err)) panic(err) } for _, id := range prevInfo.GetNodeIds() { diff --git a/internal/querynode/shard_segment_detector.go b/internal/querynode/shard_segment_detector.go index 562c57a238..930e4139fe 100644 --- a/internal/querynode/shard_segment_detector.go +++ b/internal/querynode/shard_segment_detector.go @@ -71,13 +71,13 @@ func (sd *etcdShardSegmentDetector) getCtx() context.Context { } func (sd *etcdShardSegmentDetector) watchSegments(collectionID int64, replicaID int64, vchannelName string) ([]segmentEvent, <-chan segmentEvent) { - log.Debug("segmentDetector start watch", zap.Int64("collectionID", collectionID), + log.Info("segmentDetector start watch", zap.Int64("collectionID", collectionID), zap.Int64("replicaID", replicaID), zap.String("vchannelName", vchannelName), zap.String("rootPath", sd.path)) resp, err := sd.client.Get(context.Background(), sd.path, clientv3.WithPrefix()) if err != nil { - log.Warn("Etcd SegmentDetector get replica info failed", zap.Error(err)) + log.Error("Etcd SegmentDetector get replica info failed", zap.Error(err)) panic(err) } @@ -112,7 +112,6 @@ func (sd *etcdShardSegmentDetector) watchSegments(collectionID int64, replicaID func (sd *etcdShardSegmentDetector) watch(ch clientv3.WatchChan, collectionID int64, replicaID int64, vchannel string) { defer sd.wg.Done() - log.Debug("etcdSegmentDetector start watch") for { select { case <-sd.closeCh: diff --git a/internal/querynode/stats_service.go b/internal/querynode/stats_service.go index ff20b8939f..a89fa6179d 100644 --- a/internal/querynode/stats_service.go +++ b/internal/querynode/stats_service.go @@ -54,7 +54,7 @@ func (sService *statsService) start() { statsStream, _ := sService.msFactory.NewMsgStream(sService.ctx) statsStream.AsProducer(producerChannels) - log.Debug("QueryNode statsService AsProducer succeed", zap.Strings("channels", producerChannels)) + log.Info("QueryNode statsService AsProducer succeed", zap.Strings("channels", producerChannels)) var statsMsgStream msgstream.MsgStream = statsStream @@ -65,7 +65,7 @@ func (sService *statsService) start() { for { select { case <-sService.ctx.Done(): - log.Debug("stats service closed") + log.Info("stats service closed") return case <-time.After(time.Duration(sleepTimeInterval) * time.Millisecond): sService.publicStatistic(nil) @@ -103,6 +103,6 @@ func (sService *statsService) publicStatistic(fieldStats []*internalpb.FieldStat } err := sService.statsStream.Produce(&msgPack) if err != nil { - log.Error(err.Error()) + log.Warn("failed to publish stats", zap.Error(err)) } } diff --git a/internal/querynode/streaming.go b/internal/querynode/streaming.go index 6eb80b03c5..a6b5b1ac5c 100644 --- a/internal/querynode/streaming.go +++ b/internal/querynode/streaming.go @@ -129,6 +129,7 @@ func (s *streaming) search(searchReqs []*searchRequest, collID UniqueID, partIDs return searchResults, searchSegmentIDs, searchPartIDs, nil } if err != nil { + log.Error(err.Error()) return searchResults, searchSegmentIDs, searchPartIDs, err } log.Debug("no partition specified, search all partitions", diff --git a/internal/querynode/task.go b/internal/querynode/task.go index ed2a65434c..56d4373482 100644 --- a/internal/querynode/task.go +++ b/internal/querynode/task.go @@ -128,7 +128,7 @@ func (r *addQueryChannelTask) PreExecute(ctx context.Context) error { } func (r *addQueryChannelTask) Execute(ctx context.Context) error { - log.Debug("Execute addQueryChannelTask", + log.Info("Execute addQueryChannelTask", zap.Any("collectionID", r.req.CollectionID)) collectionID := r.req.CollectionID @@ -137,7 +137,7 @@ func (r *addQueryChannelTask) Execute(ctx context.Context) error { } qc := r.node.queryShardService.getQueryChannel(collectionID) - log.Debug("add query channel for collection", zap.Int64("collectionID", collectionID)) + log.Info("add query channel for collection", zap.Int64("collectionID", collectionID)) consumeSubName := funcutil.GenChannelSubName(Params.CommonCfg.QueryNodeSubName, collectionID, Params.QueryNodeCfg.GetNodeID()) @@ -154,9 +154,7 @@ func (r *addQueryChannelTask) Execute(ctx context.Context) error { }*/ qc.Start() - log.Debug("start query channel", zap.Int64("collectionID", collectionID)) - - log.Debug("addQueryChannelTask done", + log.Info("addQueryChannelTask done", zap.Any("collectionID", r.req.CollectionID), ) return nil @@ -218,7 +216,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { return errors.New("get physical channels failed, illegal channel length, collectionID = " + fmt.Sprintln(collectionID)) } - log.Debug("Starting WatchDmChannels ...", + log.Info("Starting WatchDmChannels ...", zap.String("collectionName", w.req.Schema.Name), zap.Int64("collectionID", collectionID), zap.Int64("replicaID", w.req.GetReplicaID()), @@ -277,15 +275,16 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { w.node.streaming.replica.addPartition(collectionID, partitionID) } - log.Debug("loading growing segments in WatchDmChannels...", + log.Info("loading growing segments in WatchDmChannels...", zap.Int64("collectionID", collectionID), zap.Int64s("unFlushedSegmentIDs", unFlushedSegmentIDs), ) err := w.node.loader.loadSegment(req, segmentTypeGrowing) if err != nil { + log.Warn(err.Error()) return err } - log.Debug("successfully load growing segments done in WatchDmChannels", + log.Info("successfully load growing segments done in WatchDmChannels", zap.Int64("collectionID", collectionID), zap.Int64s("unFlushedSegmentIDs", unFlushedSegmentIDs), ) @@ -312,7 +311,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { info.SeekPosition.MsgGroup = consumeSubName channel2SeekPosition[info.ChannelName] = info.SeekPosition } - log.Debug("watchDMChannel, group channels done", zap.Int64("collectionID", collectionID)) + log.Info("watchDMChannel, group channels done", zap.Int64("collectionID", collectionID)) // add excluded segments for unFlushed segments, // unFlushed segments before check point should be filtered out. @@ -325,7 +324,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { for i := 0; i < len(unFlushedCheckPointInfos); i++ { unflushedSegmentIDs = append(unflushedSegmentIDs, unFlushedCheckPointInfos[i].GetID()) } - log.Debug("watchDMChannel, add check points info for unFlushed segments done", + log.Info("watchDMChannel, add check points info for unFlushed segments done", zap.Int64("collectionID", collectionID), zap.Any("unflushedSegmentIDs", unflushedSegmentIDs), ) @@ -345,7 +344,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { } } w.node.streaming.replica.addExcludedSegments(collectionID, flushedCheckPointInfos) - log.Debug("watchDMChannel, add check points info for flushed segments done", + log.Info("watchDMChannel, add check points info for flushed segments done", zap.Int64("collectionID", collectionID), zap.Any("flushedCheckPointInfos", flushedCheckPointInfos), ) @@ -365,7 +364,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { } } w.node.streaming.replica.addExcludedSegments(collectionID, droppedCheckPointInfos) - log.Debug("watchDMChannel, add check points info for dropped segments done", + log.Info("watchDMChannel, add check points info for dropped segments done", zap.Int64("collectionID", collectionID), zap.Any("droppedCheckPointInfos", droppedCheckPointInfos), ) @@ -376,7 +375,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { log.Warn("watchDMChannel, add flowGraph for dmChannels failed", zap.Int64("collectionID", collectionID), zap.Strings("vChannels", vChannels), zap.Error(err)) return err } - log.Debug("Query node add DML flow graphs", zap.Int64("collectionID", collectionID), zap.Any("channels", vChannels)) + log.Info("Query node add DML flow graphs", zap.Int64("collectionID", collectionID), zap.Any("channels", vChannels)) // channels as consumer for channel, fg := range channel2FlowGraph { @@ -414,7 +413,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { return err } - log.Debug("watchDMChannel, add flowGraph for dmChannels success", zap.Int64("collectionID", collectionID), zap.Strings("vChannels", vChannels)) + log.Info("watchDMChannel, add flowGraph for dmChannels success", zap.Int64("collectionID", collectionID), zap.Strings("vChannels", vChannels)) sCol.addVChannels(vChannels) sCol.addPChannels(pChannels) @@ -423,7 +422,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { hCol.addVChannels(vChannels) hCol.addPChannels(pChannels) hCol.setLoadType(lType) - log.Debug("watchDMChannel, init replica done", zap.Int64("collectionID", collectionID), zap.Strings("vChannels", vChannels)) + log.Info("watchDMChannel, init replica done", zap.Int64("collectionID", collectionID), zap.Strings("vChannels", vChannels)) // create tSafe for _, channel := range vChannels { @@ -452,7 +451,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { fg.flowGraph.Start() } - log.Debug("WatchDmChannels done", zap.Int64("collectionID", collectionID), zap.Strings("vChannels", vChannels)) + log.Info("WatchDmChannels done", zap.Int64("collectionID", collectionID), zap.Strings("vChannels", vChannels)) return nil } @@ -498,7 +497,7 @@ func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error { VPDeltaChannels[v] = p vChannel2SeekPosition[v] = info.SeekPosition } - log.Debug("Starting WatchDeltaChannels ...", + log.Info("Starting WatchDeltaChannels ...", zap.Any("collectionID", collectionID), zap.Any("vDeltaChannels", vDeltaChannels), zap.Any("pChannels", pDeltaChannels), @@ -506,7 +505,7 @@ func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error { if len(VPDeltaChannels) != len(vDeltaChannels) { return errors.New("get physical channels failed, illegal channel length, collectionID = " + fmt.Sprintln(collectionID)) } - log.Debug("Get physical channels done", + log.Info("Get physical channels done", zap.Any("collectionID", collectionID), ) @@ -559,7 +558,7 @@ func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error { return err } - log.Debug("watchDeltaChannel, add flowGraph for deltaChannel success", zap.Int64("collectionID", collectionID), zap.Strings("vDeltaChannels", vDeltaChannels)) + log.Info("watchDeltaChannel, add flowGraph for deltaChannel success", zap.Int64("collectionID", collectionID), zap.Strings("vDeltaChannels", vDeltaChannels)) //set collection replica hCol.addVDeltaChannels(vDeltaChannels) @@ -600,7 +599,7 @@ func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error { fg.flowGraph.Start() } - log.Debug("WatchDeltaChannels done", zap.Int64("collectionID", collectionID), zap.String("ChannelIDs", fmt.Sprintln(vDeltaChannels))) + log.Info("WatchDeltaChannels done", zap.Int64("collectionID", collectionID), zap.String("ChannelIDs", fmt.Sprintln(vDeltaChannels))) return nil } @@ -632,7 +631,7 @@ func (l *loadSegmentsTask) PreExecute(ctx context.Context) error { func (l *loadSegmentsTask) Execute(ctx context.Context) error { // TODO: support db - log.Debug("LoadSegment start", zap.Int64("msgID", l.req.Base.MsgID)) + log.Info("LoadSegment start", zap.Int64("msgID", l.req.Base.MsgID)) var err error // init meta @@ -656,7 +655,7 @@ func (l *loadSegmentsTask) Execute(ctx context.Context) error { return err } - log.Debug("LoadSegments done", zap.Int64("msgID", l.req.Base.MsgID)) + log.Info("LoadSegments done", zap.Int64("msgID", l.req.Base.MsgID)) return nil } @@ -695,12 +694,11 @@ const ( ) func (r *releaseCollectionTask) Execute(ctx context.Context) error { - log.Debug("Execute release collection task", zap.Any("collectionID", r.req.CollectionID)) - log.Debug("release streaming", zap.Any("collectionID", r.req.CollectionID)) + log.Info("Execute release collection task", zap.Any("collectionID", r.req.CollectionID)) // sleep to wait for query tasks done const gracefulReleaseTime = 1 time.Sleep(gracefulReleaseTime * time.Second) - log.Debug("Starting release collection...", + log.Info("Starting release collection...", zap.Any("collectionID", r.req.CollectionID), ) @@ -712,7 +710,7 @@ func (r *releaseCollectionTask) Execute(ctx context.Context) error { } // remove collection metas in streaming and historical - log.Debug("release historical", zap.Any("collectionID", r.req.CollectionID)) + log.Info("release historical", zap.Any("collectionID", r.req.CollectionID)) err = r.releaseReplica(r.node.historical.replica, replicaHistorical) if err != nil { return fmt.Errorf("release collection failed, collectionID = %d, err = %s", r.req.CollectionID, err) @@ -720,7 +718,7 @@ func (r *releaseCollectionTask) Execute(ctx context.Context) error { debug.FreeOSMemory() - log.Debug("ReleaseCollection done", zap.Int64("collectionID", r.req.CollectionID)) + log.Info("ReleaseCollection done", zap.Int64("collectionID", r.req.CollectionID)) return nil } @@ -730,7 +728,7 @@ func (r *releaseCollectionTask) releaseReplica(replica ReplicaInterface, replica return err } // set release time - log.Debug("set release time", zap.Any("collectionID", r.req.CollectionID)) + log.Info("set release time", zap.Any("collectionID", r.req.CollectionID)) collection.setReleaseTime(r.req.Base.Timestamp) // remove all flow graphs of the target collection @@ -746,7 +744,7 @@ func (r *releaseCollectionTask) releaseReplica(replica ReplicaInterface, replica // remove all tSafes of the target collection for _, channel := range channels { - log.Debug("Releasing tSafe in releaseCollectionTask...", + log.Info("Releasing tSafe in releaseCollectionTask...", zap.Any("collectionID", r.req.CollectionID), zap.Any("vDeltaChannel", channel), ) @@ -789,7 +787,7 @@ func (r *releasePartitionsTask) PreExecute(ctx context.Context) error { } func (r *releasePartitionsTask) Execute(ctx context.Context) error { - log.Debug("Execute release partition task", + log.Info("Execute release partition task", zap.Any("collectionID", r.req.CollectionID), zap.Any("partitionIDs", r.req.PartitionIDs)) @@ -806,7 +804,7 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error { if err != nil { return fmt.Errorf("release partitions failed, collectionID = %d, err = %s", r.req.CollectionID, err) } - log.Debug("start release partition", zap.Any("collectionID", r.req.CollectionID)) + log.Info("start release partition", zap.Any("collectionID", r.req.CollectionID)) for _, id := range r.req.PartitionIDs { // remove partition from streaming and historical @@ -828,7 +826,7 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error { } } - log.Debug("Release partition task done", + log.Info("Release partition task done", zap.Any("collectionID", r.req.CollectionID), zap.Any("partitionIDs", r.req.PartitionIDs)) return nil diff --git a/internal/querynode/task_queue.go b/internal/querynode/task_queue.go index 5dcdc3152c..d87d4d28cf 100644 --- a/internal/querynode/task_queue.go +++ b/internal/querynode/task_queue.go @@ -141,7 +141,7 @@ func (queue *baseTaskQueue) PopActiveTask(tID UniqueID) task { delete(queue.activeTasks, tID) return t } - log.Debug("queryNode", zap.Int64("cannot found ID in the active task list!", tID)) + log.Info("queryNode", zap.Int64("cannot found ID in the active task list!", tID)) return nil } diff --git a/internal/querynode/tsafe_replica.go b/internal/querynode/tsafe_replica.go index 669d8ca708..cccb6c3c12 100644 --- a/internal/querynode/tsafe_replica.go +++ b/internal/querynode/tsafe_replica.go @@ -77,11 +77,11 @@ func (t *tSafeReplica) addTSafe(vChannel Channel) { defer t.mu.Unlock() if _, ok := t.tSafes[vChannel]; !ok { t.tSafes[vChannel] = newTSafe(vChannel) - log.Debug("add tSafe done", + log.Info("add tSafe done", zap.String("channel", vChannel), ) } else { - log.Debug("tSafe has been existed", + log.Info("tSafe has been existed", zap.String("channel", vChannel), ) } @@ -91,7 +91,7 @@ func (t *tSafeReplica) removeTSafe(vChannel Channel) { t.mu.Lock() defer t.mu.Unlock() - log.Debug("remove tSafe replica", + log.Info("remove tSafe replica", zap.String("vChannel", vChannel), ) delete(t.tSafes, vChannel)