From 63e08606e276ae5beb784f1b46103e90e2bd54b6 Mon Sep 17 00:00:00 2001 From: yah01 Date: Thu, 25 Aug 2022 19:34:54 +0800 Subject: [PATCH] Fix load segment hangs forever (#18814) Caused if the context is timeout Signed-off-by: yah01 Signed-off-by: yah01 --- internal/querynode/segment_loader.go | 14 +++++++--- internal/querynode/segment_loader_test.go | 31 ++++++++++++++++------- 2 files changed, 33 insertions(+), 12 deletions(-) diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index 396b1009ff..350c48508e 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -50,6 +50,10 @@ const ( requestConcurrencyLevelLimit = 8 ) +var ( + ErrReadDeltaMsgFailed = errors.New("ReadDeltaMsgFailed") +) + // segmentLoader is only responsible for loading the field data from binlog type segmentLoader struct { metaReplica ReplicaInterface @@ -689,11 +693,15 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection for hasMore { select { case <-ctx.Done(): - break + return ctx.Err() case msgPack, ok := <-stream.Chan(): if !ok { - log.Warn("fail to read delta msg", zap.String("pChannelName", pChannelName), zap.Any("msg id", position.GetMsgID()), zap.Error(err)) - return err + log.Warn("fail to read delta msg", + zap.String("pChannelName", pChannelName), + zap.ByteString("msgID", position.GetMsgID()), + zap.Error(err), + ) + return fmt.Errorf("%w: pChannelName=%v, msgID=%v", ErrReadDeltaMsgFailed, pChannelName, position.GetMsgID()) } if msgPack == nil { diff --git a/internal/querynode/segment_loader_test.go b/internal/querynode/segment_loader_test.go index 2a7a87aa89..e55d074ac0 100644 --- a/internal/querynode/segment_loader_test.go +++ b/internal/querynode/segment_loader_test.go @@ -22,12 +22,14 @@ import ( "math/rand" "runtime" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/milvus-io/milvus/internal/common" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -598,7 +600,7 @@ func TestSegmentLoader_testFromDmlCPLoadDelete(t *testing.T) { mockMsg := &mockMsgID{} mockMsg.On("AtEarliestPosition").Return(true, nil) mockMsg.On("Equal", mock.AnythingOfType("string")).Return(false, nil) - assert.Nil(t, testConsumingDeltaMsg(ctx, t, position, true, mockMsg)) + assert.Nil(t, testConsumingDeltaMsg(ctx, t, position, true, true, mockMsg)) } // test already reach latest position @@ -606,7 +608,7 @@ func TestSegmentLoader_testFromDmlCPLoadDelete(t *testing.T) { mockMsg := &mockMsgID{} mockMsg.On("AtEarliestPosition").Return(false, nil) mockMsg.On("Equal", mock.AnythingOfType("string")).Return(true, nil) - assert.Nil(t, testConsumingDeltaMsg(ctx, t, position, true, mockMsg)) + assert.Nil(t, testConsumingDeltaMsg(ctx, t, position, true, true, mockMsg)) } //test consume after seeking when get last msg successfully @@ -615,7 +617,7 @@ func TestSegmentLoader_testFromDmlCPLoadDelete(t *testing.T) { mockMsg.On("AtEarliestPosition").Return(false, nil) mockMsg.On("Equal", mock.AnythingOfType("string")).Return(false, nil) mockMsg.On("LessOrEqualThan", mock.AnythingOfType("string")).Return(true, nil) - assert.Nil(t, testConsumingDeltaMsg(ctx, t, position, true, mockMsg)) + assert.Nil(t, testConsumingDeltaMsg(ctx, t, position, true, true, mockMsg)) } //test compare msgID failed when get last msg successfully @@ -624,7 +626,7 @@ func TestSegmentLoader_testFromDmlCPLoadDelete(t *testing.T) { mockMsg.On("AtEarliestPosition").Return(false, nil) mockMsg.On("Equal", mock.AnythingOfType("string")).Return(false, nil) mockMsg.On("LessOrEqualThan", mock.AnythingOfType("string")).Return(true, errors.New("")) - assert.NotNil(t, testConsumingDeltaMsg(ctx, t, position, true, mockMsg)) + assert.NotNil(t, testConsumingDeltaMsg(ctx, t, position, true, true, mockMsg)) } //test consume after seeking when get last msg failed @@ -633,7 +635,18 @@ func TestSegmentLoader_testFromDmlCPLoadDelete(t *testing.T) { mockMsg.On("AtEarliestPosition").Return(false, nil) mockMsg.On("Equal", mock.AnythingOfType("string")).Return(false, nil) mockMsg.On("LessOrEqualThan", mock.AnythingOfType("string")).Return(true, errors.New("")) - assert.NotNil(t, testConsumingDeltaMsg(ctx, t, position, false, mockMsg)) + assert.NotNil(t, testConsumingDeltaMsg(ctx, t, position, false, true, mockMsg)) + } + + //test context timeout when reading stream + { + log.Debug("test context timeout when reading stream") + mockMsg := &mockMsgID{} + mockMsg.On("AtEarliestPosition").Return(false, nil) + mockMsg.On("Equal", mock.AnythingOfType("string")).Return(false, nil) + ctx, cancel := context.WithDeadline(ctx, time.Now().Add(-time.Second)) + defer cancel() + assert.ErrorIs(t, testConsumingDeltaMsg(ctx, t, position, true, false, mockMsg), context.DeadlineExceeded) } } @@ -655,7 +668,7 @@ func testSeekFailWhenConsumingDeltaMsg(ctx context.Context, t *testing.T, positi assert.EqualError(t, ret, errMsg) } -func testConsumingDeltaMsg(ctx context.Context, t *testing.T, position *msgstream.MsgPosition, getLastSucc bool, mockMsg *mockMsgID) error { +func testConsumingDeltaMsg(ctx context.Context, t *testing.T, position *msgstream.MsgPosition, getLastSucc bool, hasData bool, mockMsg *mockMsgID) error { msgStream := &LoadDeleteMsgStream{} msgStream.On("AsConsumer", mock.AnythingOfTypeArgument("string"), mock.AnythingOfTypeArgument("string")) msgStream.On("Seek", mock.AnythingOfType("string")).Return(nil) @@ -666,13 +679,13 @@ func testConsumingDeltaMsg(ctx context.Context, t *testing.T, position *msgstrea msgStream.On("GetLatestMsgID", mock.AnythingOfType("string")).Return(mockMsg, errors.New("")) } - msgChan := make(chan *msgstream.MsgPack) - go func() { + msgChan := make(chan *msgstream.MsgPack, 10) + if hasData { msgChan <- nil deleteMsg1 := genDeleteMsg(defaultCollectionID+1, schemapb.DataType_Int64, defaultDelLength) deleteMsg2 := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) msgChan <- &msgstream.MsgPack{Msgs: []msgstream.TsMsg{deleteMsg1, deleteMsg2}} - }() + } msgStream.On("Chan").Return(msgChan) factory := &mockMsgStreamFactory{mockMqStream: msgStream}