diff --git a/internal/querynode/query_collection.go b/internal/querynode/query_collection.go index 463074f503..f374673c17 100644 --- a/internal/querynode/query_collection.go +++ b/internal/querynode/query_collection.go @@ -165,9 +165,10 @@ func (q *queryCollection) addToUnsolvedMsg(msg queryMsg) { func (q *queryCollection) popAllUnsolvedMsg() []queryMsg { q.unsolvedMsgMu.Lock() defer q.unsolvedMsgMu.Unlock() - tmp := q.unsolvedMsg + ret := make([]queryMsg, 0, len(q.unsolvedMsg)) + ret = append(ret, q.unsolvedMsg...) q.unsolvedMsg = q.unsolvedMsg[:0] - return tmp + return ret } func (q *queryCollection) waitNewTSafe() Timestamp { diff --git a/internal/querynode/query_collection_test.go b/internal/querynode/query_collection_test.go index 6632c19051..1faa089635 100644 --- a/internal/querynode/query_collection_test.go +++ b/internal/querynode/query_collection_test.go @@ -447,3 +447,36 @@ func TestResultHandlerStage_TranslateHits(t *testing.T) { assert.Error(t, err) }) } + +func TestQueryCollection_AddPopUnsolvedMsg(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + qCollection, err := genSimpleQueryCollection(ctx, cancel) + assert.Nil(t, err) + var i int64 + for i = 0; i < 3; i++ { + qCollection.addToUnsolvedMsg(&msgstream.RetrieveMsg{ + RetrieveRequest: internalpb.RetrieveRequest{ + Base: &commonpb.MsgBase{MsgID: i}, + }, + }) + } + + unsolved := qCollection.popAllUnsolvedMsg() + assert.EqualValues(t, 3, len(unsolved)) + for i := 0; i < 3; i++ { + assert.EqualValues(t, i, unsolved[i].ID()) + } + + // add new msg to unsolved msgs and check old unsolved msg + for i := 0; i < 3; i++ { + qCollection.addToUnsolvedMsg(&msgstream.RetrieveMsg{ + RetrieveRequest: internalpb.RetrieveRequest{ + Base: &commonpb.MsgBase{MsgID: 4}, + }, + }) + } + + for i := 0; i < 3; i++ { + assert.EqualValues(t, i, unsolved[i].ID()) + } +}