diff --git a/internal/querynode/flow_graph_dd_node.go b/internal/querynode/flow_graph_dd_node.go index 7f17774d6a..4bc25d7a04 100644 --- a/internal/querynode/flow_graph_dd_node.go +++ b/internal/querynode/flow_graph_dd_node.go @@ -43,7 +43,7 @@ func (ddNode *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { msMsg, ok := in[0].(*MsgStreamMsg) if !ok { - log.Error("type assertion failed for MsgStreamMsg") + log.Warn("type assertion failed for MsgStreamMsg") // TODO: add error handling } diff --git a/internal/querynode/flow_graph_filter_dm_node.go b/internal/querynode/flow_graph_filter_dm_node.go index e509ea7b14..c50000f1a3 100644 --- a/internal/querynode/flow_graph_filter_dm_node.go +++ b/internal/querynode/flow_graph_filter_dm_node.go @@ -46,7 +46,7 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { msgStreamMsg, ok := in[0].(*MsgStreamMsg) if !ok { - log.Error("type assertion failed for MsgStreamMsg") + log.Warn("type assertion failed for MsgStreamMsg") // TODO: add error handling } diff --git a/internal/querynode/flow_graph_gc_node.go b/internal/querynode/flow_graph_gc_node.go index 634978f1dc..ee83ea0eac 100644 --- a/internal/querynode/flow_graph_gc_node.go +++ b/internal/querynode/flow_graph_gc_node.go @@ -37,7 +37,7 @@ func (gcNode *gcNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { _, ok := in[0].(*gcMsg) if !ok { - log.Error("type assertion failed for gcMsg") + log.Warn("type assertion failed for gcMsg") // TODO: add error handling } diff --git a/internal/querynode/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go index 7682682984..2d143fd52c 100644 --- a/internal/querynode/flow_graph_insert_node.go +++ b/internal/querynode/flow_graph_insert_node.go @@ -51,7 +51,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { iMsg, ok := in[0].(*insertMsg) if !ok { - log.Error("type assertion failed for insertMsg") + log.Warn("type assertion failed for insertMsg") // TODO: add error handling } diff --git a/internal/querynode/flow_graph_service_time_node.go b/internal/querynode/flow_graph_service_time_node.go index d87019af9a..85c165a96e 100644 --- a/internal/querynode/flow_graph_service_time_node.go +++ b/internal/querynode/flow_graph_service_time_node.go @@ -49,7 +49,7 @@ func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { serviceTimeMsg, ok := in[0].(*serviceTimeMsg) if !ok { - log.Error("type assertion failed for serviceTimeMsg") + log.Warn("type assertion failed for serviceTimeMsg") // TODO: add error handling } diff --git a/internal/querynode/query_collection.go b/internal/querynode/query_collection.go index aef4252cf3..a7d51271ea 100644 --- a/internal/querynode/query_collection.go +++ b/internal/querynode/query_collection.go @@ -156,7 +156,7 @@ func (q *queryCollection) waitNewTSafe() Timestamp { // block until any vChannel updating tSafe _, _, recvOK := reflect.Select(q.watcherSelectCase) if !recvOK { - log.Error("tSafe has been closed", zap.Any("collectionID", q.collectionID)) + //log.Error("tSafe has been closed", zap.Any("collectionID", q.collectionID)) return Timestamp(math.MaxInt64) } //log.Debug("wait new tSafe", zap.Any("collectionID", s.collectionID)) @@ -202,13 +202,13 @@ func (q *queryCollection) consumeQuery() { default: msgPack := q.queryMsgStream.Consume() if msgPack == nil || len(msgPack.Msgs) <= 0 { - msgPackNil := msgPack == nil - msgPackEmpty := true - if msgPack != nil { - msgPackEmpty = len(msgPack.Msgs) <= 0 - } - log.Debug("consume query message failed", zap.Any("msgPack is Nil", msgPackNil), - zap.Any("msgPackEmpty", msgPackEmpty)) + //msgPackNil := msgPack == nil + //msgPackEmpty := true + //if msgPack != nil { + // msgPackEmpty = len(msgPack.Msgs) <= 0 + //} + //log.Debug("consume query message failed", zap.Any("msgPack is Nil", msgPackNil), + // zap.Any("msgPackEmpty", msgPackEmpty)) continue } for _, msg := range msgPack.Msgs { @@ -267,28 +267,28 @@ func (q *queryCollection) receiveQueryMsg(msg queryMsg) { case commonpb.MsgType_Retrieve: collectionID = msg.(*msgstream.RetrieveMsg).CollectionID msgTypeStr = "retrieve" - log.Debug("consume retrieve message", - zap.Any("collectionID", collectionID), - zap.Int64("msgID", msg.ID()), - ) + //log.Debug("consume retrieve message", + // zap.Any("collectionID", collectionID), + // zap.Int64("msgID", msg.ID()), + //) case commonpb.MsgType_Search: collectionID = msg.(*msgstream.SearchMsg).CollectionID msgTypeStr = "search" - log.Debug("consume search message", - zap.Any("collectionID", collectionID), - zap.Int64("msgID", msg.ID()), - ) + //log.Debug("consume search message", + // zap.Any("collectionID", collectionID), + // zap.Int64("msgID", msg.ID()), + //) default: err := fmt.Errorf("receive invalid msgType = %d", msgType) log.Error(err.Error()) return } if collectionID != q.collectionID { - log.Error("not target collection query request", - zap.Any("collectionID", q.collectionID), - zap.Int64("target collectionID", collectionID), - zap.Int64("msgID", msg.ID()), - ) + //log.Error("not target collection query request", + // zap.Any("collectionID", q.collectionID), + // zap.Int64("target collectionID", collectionID), + // zap.Int64("msgID", msg.ID()), + //) return } diff --git a/internal/querynode/reduce.go b/internal/querynode/reduce.go index 083565a1f8..90e5da4582 100644 --- a/internal/querynode/reduce.go +++ b/internal/querynode/reduce.go @@ -22,7 +22,6 @@ package querynode import "C" import ( "errors" - "fmt" "strconv" "sync" "unsafe" @@ -61,7 +60,7 @@ func reduceSearchResults(searchResults []*SearchResult, numSegments int64, inRed func fillTargetEntry(plan *SearchPlan, searchResults []*SearchResult, matchedSegments []*Segment, inReduced []bool) error { wg := &sync.WaitGroup{} - fmt.Println(inReduced) + //fmt.Println(inReduced) for i := range inReduced { if inReduced[i] { wg.Add(1) diff --git a/internal/util/flowgraph/node.go b/internal/util/flowgraph/node.go index e97a52110f..07f18d2cce 100644 --- a/internal/util/flowgraph/node.go +++ b/internal/util/flowgraph/node.go @@ -58,7 +58,7 @@ func (nodeCtx *nodeCtx) Start(ctx context.Context, wg *sync.WaitGroup) { select { case <-ctx.Done(): wg.Done() - fmt.Println(nodeCtx.node.Name(), "closed") + //fmt.Println(nodeCtx.node.Name(), "closed") return default: // inputs from inputsMessages for Operate