From a4ea2fb18a6c67810725ce5eeef0e7baff44d4c3 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Tue, 24 May 2022 21:11:59 +0800 Subject: [PATCH] Handle errors in DataNode and QueryNode flow graph (#17096) Signed-off-by: bigsheeper --- internal/datanode/compactor_test.go | 8 + internal/datanode/flow_graph_dd_node.go | 26 +- internal/datanode/flow_graph_dd_node_test.go | 49 ++- internal/datanode/flow_graph_delete_node.go | 40 ++- .../datanode/flow_graph_delete_node_test.go | 37 +++ .../datanode/flow_graph_insert_buffer_node.go | 67 ++-- .../flow_graph_insert_buffer_node_test.go | 38 ++- internal/datanode/flow_graph_node.go | 7 +- internal/datanode/flow_graph_time_ticker.go | 8 +- internal/querynode/data_sync_service.go | 35 +++ internal/querynode/data_sync_service_test.go | 205 +++++++++---- internal/querynode/flow_graph_delete_node.go | 37 ++- .../querynode/flow_graph_delete_node_test.go | 20 +- .../flow_graph_filter_delete_node.go | 35 ++- .../flow_graph_filter_delete_node_test.go | 82 +++-- .../querynode/flow_graph_filter_dm_node.go | 102 +++--- .../flow_graph_filter_dm_node_test.go | 136 +++++--- internal/querynode/flow_graph_insert_node.go | 134 ++++---- .../querynode/flow_graph_insert_node_test.go | 290 ++++++++++-------- .../querynode/flow_graph_service_time_node.go | 9 +- .../flow_graph_service_time_node_test.go | 8 +- internal/querynode/meta_replica.go | 9 +- internal/querynode/mock_test.go | 2 +- internal/querynode/segment.go | 2 +- internal/querynode/segment_test.go | 2 +- internal/querynode/task_test.go | 9 +- 26 files changed, 947 insertions(+), 450 deletions(-) diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index 79122c1805..eca31f81de 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -18,6 +18,7 @@ package datanode import ( "context" + "fmt" "math" "testing" "time" @@ -862,15 +863,22 @@ func TestCompactorInterfaceMethods(t *testing.T) { type mockFlushManager struct { sleepSeconds int32 + returnError bool } var _ flushManager = (*mockFlushManager)(nil) func (mfm *mockFlushManager) flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, dropped bool, pos *internalpb.MsgPosition) error { + if mfm.returnError { + return fmt.Errorf("mock error") + } return nil } func (mfm *mockFlushManager) flushDelData(data *DelDataBuf, segmentID UniqueID, pos *internalpb.MsgPosition) error { + if mfm.returnError { + return fmt.Errorf("mock error") + } return nil } diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index b176fae9fc..7819746a02 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -19,9 +19,13 @@ package datanode import ( "context" "fmt" + "reflect" "sync" "sync/atomic" + "github.com/opentracing/opentracing-go" + "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/mq/msgstream" @@ -30,9 +34,8 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/util/flowgraph" "github.com/milvus-io/milvus/internal/util/funcutil" + "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/trace" - "github.com/opentracing/opentracing-go" - "go.uber.org/zap" ) // make sure ddNode implements flowgraph.Node @@ -55,6 +58,7 @@ var _ flowgraph.Node = (*ddNode)(nil) type ddNode struct { BaseNode + ctx context.Context collectionID UniqueID segID2SegInfo sync.Map // segment ID to *SegmentInfo @@ -81,7 +85,11 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { msMsg, ok := in[0].(*MsgStreamMsg) if !ok { - log.Warn("Type assertion failed for MsgStreamMsg") + if in[0] == nil { + log.Debug("type assertion failed for MsgStreamMsg because it's nil") + } else { + log.Warn("type assertion failed for MsgStreamMsg", zap.String("name", reflect.TypeOf(in[0]).Name())) + } return []Msg{} } @@ -163,12 +171,13 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { fgMsg.deleteMessages = append(fgMsg.deleteMessages, dmsg) } } - err := ddn.forwardDeleteMsg(forwardMsgs, msMsg.TimestampMin(), msMsg.TimestampMax()) + err := retry.Do(ddn.ctx, func() error { + return ddn.forwardDeleteMsg(forwardMsgs, msMsg.TimestampMin(), msMsg.TimestampMax()) + }, flowGraphRetryOpt) if err != nil { - // TODO: proper deal with error - log.Warn("DDNode forward delete msg failed", - zap.String("vChannelName", ddn.vchannelName), - zap.Error(err)) + err = fmt.Errorf("DDNode forward delete msg failed, vChannel = %s, err = %s", ddn.vchannelName, err) + log.Error(err.Error()) + panic(err) } fgMsg.startPositions = append(fgMsg.startPositions, msMsg.StartPositions()...) @@ -301,6 +310,7 @@ func newDDNode(ctx context.Context, collID UniqueID, vchanInfo *datapb.VchannelI deltaMsgStream.Start() dd := &ddNode{ + ctx: ctx, BaseNode: baseNode, collectionID: collID, flushedSegments: fs, diff --git a/internal/datanode/flow_graph_dd_node_test.go b/internal/datanode/flow_graph_dd_node_test.go index a1d4e2a994..f6f8f699fd 100644 --- a/internal/datanode/flow_graph_dd_node_test.go +++ b/internal/datanode/flow_graph_dd_node_test.go @@ -21,6 +21,8 @@ import ( "fmt" "testing" + "github.com/milvus-io/milvus/internal/util/retry" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -148,7 +150,10 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) { factory := dependency.NewDefaultFactory(true) deltaStream, err := factory.NewMsgStream(context.Background()) assert.Nil(t, err) + deltaStream.SetRepackFunc(msgstream.DefaultRepackFunc) + deltaStream.AsProducer([]string{"DataNode-test-delta-channel-0"}) ddn := ddNode{ + ctx: context.Background(), collectionID: test.ddnCollID, deltaMsgStream: deltaStream, vchannelName: "ddn_drop_msg", @@ -208,8 +213,11 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) { factory := dependency.NewDefaultFactory(true) deltaStream, err := factory.NewMsgStream(context.Background()) assert.Nil(t, err) + deltaStream.SetRepackFunc(msgstream.DefaultRepackFunc) + deltaStream.AsProducer([]string{"DataNode-test-delta-channel-0"}) // Prepare ddNode states ddn := ddNode{ + ctx: context.Background(), flushedSegments: []*datapb.SegmentInfo{fs}, collectionID: test.ddnCollID, deltaMsgStream: deltaStream, @@ -254,15 +262,21 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) { factory := dependency.NewDefaultFactory(true) deltaStream, err := factory.NewMsgStream(context.Background()) assert.Nil(t, err) + deltaStream.SetRepackFunc(msgstream.DefaultRepackFunc) + deltaStream.AsProducer([]string{"DataNode-test-delta-channel-0"}) // Prepare ddNode states ddn := ddNode{ + ctx: context.Background(), collectionID: test.ddnCollID, deltaMsgStream: deltaStream, } // Prepare delete messages var dMsg msgstream.TsMsg = &msgstream.DeleteMsg{ - BaseMsg: msgstream.BaseMsg{EndTimestamp: test.MsgEndTs}, + BaseMsg: msgstream.BaseMsg{ + EndTimestamp: test.MsgEndTs, + HashValues: []uint32{0}, + }, DeleteRequest: internalpb.DeleteRequest{ Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_Delete}, CollectionID: test.inMsgCollID, @@ -277,6 +291,39 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) { }) } }) + + to.Run("Test forwardDeleteMsg failed", func(te *testing.T) { + factory := dependency.NewDefaultFactory(true) + deltaStream, err := factory.NewMsgStream(context.Background()) + assert.Nil(to, err) + deltaStream.SetRepackFunc(msgstream.DefaultRepackFunc) + // Prepare ddNode states + ddn := ddNode{ + ctx: context.Background(), + collectionID: 1, + deltaMsgStream: deltaStream, + } + + // Prepare delete messages + var dMsg msgstream.TsMsg = &msgstream.DeleteMsg{ + BaseMsg: msgstream.BaseMsg{ + EndTimestamp: 2000, + HashValues: []uint32{0}, + }, + DeleteRequest: internalpb.DeleteRequest{ + Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_Delete}, + CollectionID: 1, + }, + } + tsMessages := []msgstream.TsMsg{dMsg} + var msgStreamMsg Msg = flowgraph.GenerateMsgStreamMsg(tsMessages, 0, 0, nil, nil) + + // Test + flowGraphRetryOpt = retry.Attempts(1) + assert.Panics(te, func() { + ddn.Operate([]Msg{msgStreamMsg}) + }) + }) } func TestFlowGraph_DDNode_filterMessages(te *testing.T) { diff --git a/internal/datanode/flow_graph_delete_node.go b/internal/datanode/flow_graph_delete_node.go index 2fab0755b2..13156d090d 100644 --- a/internal/datanode/flow_graph_delete_node.go +++ b/internal/datanode/flow_graph_delete_node.go @@ -20,8 +20,10 @@ import ( "context" "fmt" "math" + "reflect" "sync" + "github.com/opentracing/opentracing-go" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/common" @@ -31,8 +33,8 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/trace" - "github.com/opentracing/opentracing-go" ) type ( @@ -43,6 +45,8 @@ type ( // DeleteNode is to process delete msg, flush delete info into storage. type deleteNode struct { BaseNode + + ctx context.Context channelName string delBuf sync.Map // map[segmentID]*DelDataBuf replica Replica @@ -129,9 +133,7 @@ func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange) er rows := len(pks) tss, ok := segIDToTss[segID] if !ok || rows != len(tss) { - // TODO: what's the expected behavior after this Error? - log.Error("primary keys and timestamp's element num mis-match") - continue + return fmt.Errorf("primary keys and timestamp's element num mis-match, segmentID = %d", segID) } var delDataBuf *DelDataBuf @@ -202,8 +204,12 @@ func (dn *deleteNode) Operate(in []Msg) []Msg { fgMsg, ok := in[0].(*flowGraphMsg) if !ok { - log.Warn("type assertion failed for flowGraphMsg") - return nil + if in[0] == nil { + log.Debug("type assertion failed for flowGraphMsg because it's nil") + } else { + log.Warn("type assertion failed for flowGraphMsg", zap.String("name", reflect.TypeOf(in[0]).Name())) + } + return []Msg{} } var spans []opentracing.Span @@ -217,8 +223,12 @@ func (dn *deleteNode) Operate(in []Msg) []Msg { traceID, _, _ := trace.InfoFromSpan(spans[i]) log.Info("Buffer delete request in DataNode", zap.String("traceID", traceID)) - if err := dn.bufferDeleteMsg(msg, fgMsg.timeRange); err != nil { - log.Error("buffer delete msg failed", zap.Error(err)) + err := dn.bufferDeleteMsg(msg, fgMsg.timeRange) + if err != nil { + // error occurs only when deleteMsg is misaligned, should not happen + err = fmt.Errorf("buffer delete msg failed, err = %s", err) + log.Error(err.Error()) + panic(err) } } @@ -238,13 +248,16 @@ func (dn *deleteNode) Operate(in []Msg) []Msg { // no related delta data to flush, send empty buf to complete flush life-cycle dn.flushManager.flushDelData(nil, segmentToFlush, fgMsg.endPositions[0]) } else { - err := dn.flushManager.flushDelData(buf.(*DelDataBuf), segmentToFlush, fgMsg.endPositions[0]) + err := retry.Do(dn.ctx, func() error { + return dn.flushManager.flushDelData(buf.(*DelDataBuf), segmentToFlush, fgMsg.endPositions[0]) + }, flowGraphRetryOpt) if err != nil { - log.Warn("Failed to flush delete data", zap.Error(err)) - } else { - // remove delete buf - dn.delBuf.Delete(segmentToFlush) + err = fmt.Errorf("failed to flush delete data, err = %s", err) + log.Error(err.Error()) + panic(err) } + // remove delete buf + dn.delBuf.Delete(segmentToFlush) } } } @@ -301,6 +314,7 @@ func newDeleteNode(ctx context.Context, fm flushManager, sig chan<- string, conf baseNode.SetMaxParallelism(config.maxParallelism) return &deleteNode{ + ctx: ctx, BaseNode: baseNode, delBuf: sync.Map{}, diff --git a/internal/datanode/flow_graph_delete_node_test.go b/internal/datanode/flow_graph_delete_node_test.go index 157a3d36cd..b16055ced9 100644 --- a/internal/datanode/flow_graph_delete_node_test.go +++ b/internal/datanode/flow_graph_delete_node_test.go @@ -22,6 +22,8 @@ import ( "testing" "time" + "github.com/milvus-io/milvus/internal/util/retry" + "github.com/bits-and-blooms/bloom/v3" "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/mq/msgstream" @@ -207,6 +209,8 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { "Invalid input length == 3"}, {[]Msg{&flowGraphMsg{}}, "Invalid input length == 1 but input message is not msgStreamMsg"}, + {[]Msg{&flowgraph.MsgStreamMsg{}}, + "Invalid input length == 1 but input message is not flowGraphMsg"}, } for _, test := range invalidInTests { @@ -388,4 +392,37 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { case <-sig: } }) + + t.Run("Test deleteNode Operate flushDelData failed", func(te *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + chanName := "datanode-test-FlowGraphDeletenode-operate" + testPath := "/test/datanode/root/meta" + assert.NoError(t, clearEtcd(testPath)) + Params.EtcdCfg.MetaRootPath = testPath + Params.DataNodeCfg.DeleteBinlogRootPath = testPath + + c := &nodeConfig{ + replica: &mockReplica{}, + allocator: NewAllocatorFactory(), + vChannelName: chanName, + } + delNode, err := newDeleteNode(ctx, fm, make(chan string, 1), c) + assert.Nil(te, err) + + msg := genFlowGraphDeleteMsg(int64Pks, chanName) + msg.segmentsToFlush = []UniqueID{-1} + delNode.delBuf.Store(UniqueID(-1), &DelDataBuf{}) + delNode.flushManager = &mockFlushManager{ + returnError: true, + } + + var fgMsg flowgraph.Msg = &msg + + flowGraphRetryOpt = retry.Attempts(1) + assert.Panics(te, func() { + delNode.Operate([]flowgraph.Msg{fgMsg}) + }) + }) } diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 1765bcabd0..1103db9b37 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "reflect" "sync" "github.com/golang/protobuf/proto" @@ -36,6 +37,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/funcutil" + "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/trace" "github.com/milvus-io/milvus/internal/util/tsoutil" ) @@ -50,6 +52,8 @@ type ( type insertBufferNode struct { BaseNode + + ctx context.Context channelName string insertBuffer sync.Map // SegmentID to BufferData replica Replica @@ -161,8 +165,11 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { fgMsg, ok := in[0].(*flowGraphMsg) if !ok { - log.Warn("type assertion failed for flowGraphMsg") - ibNode.Close() + if in[0] == nil { + log.Debug("type assertion failed for flowGraphMsg because it's nil") + } else { + log.Warn("type assertion failed for flowGraphMsg", zap.String("name", reflect.TypeOf(in[0]).Name())) + } return []Msg{} } @@ -192,12 +199,11 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { } if startPositions[0].Timestamp < ibNode.lastTimestamp { - log.Error("insert buffer node consumed old messages", - zap.String("channel", ibNode.channelName), - zap.Any("timestamp", startPositions[0].Timestamp), - zap.Any("lastTimestamp", ibNode.lastTimestamp), - ) - return []Msg{} + // message stream should guarantee that this should not happen + err := fmt.Errorf("insert buffer node consumed old messages, channel = %s, timestamp = %d, lastTimestamp = %d", + ibNode.channelName, startPositions[0].Timestamp, ibNode.lastTimestamp) + log.Error(err.Error()) + panic(err) } ibNode.lastTimestamp = endPositions[0].Timestamp @@ -205,15 +211,20 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { // Updating segment statistics in replica seg2Upload, err := ibNode.updateSegStatesInReplica(fgMsg.insertMessages, startPositions[0], endPositions[0]) if err != nil { - log.Warn("update segment states in Replica wrong", zap.Error(err)) - return []Msg{} + // Occurs only if the collectionID is mismatch, should not happen + err = fmt.Errorf("update segment states in Replica wrong, err = %s", err) + log.Error(err.Error()) + panic(err) } // insert messages -> buffer for _, msg := range fgMsg.insertMessages { err := ibNode.bufferInsertMsg(msg, endPositions[0]) if err != nil { - log.Warn("msg to buffer failed", zap.Error(err)) + // error occurs when missing schema info or data is misaligned, should not happen + err = fmt.Errorf("insertBufferNode msg to buffer failed, err = %s", err) + log.Error(err.Error()) + panic(err) } } @@ -341,30 +352,35 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { } for _, task := range flushTaskList { - err := ibNode.flushManager.flushBufferData(task.buffer, task.segmentID, task.flushed, task.dropped, endPositions[0]) + err = retry.Do(ibNode.ctx, func() error { + return ibNode.flushManager.flushBufferData(task.buffer, + task.segmentID, + task.flushed, + task.dropped, + endPositions[0]) + }, flowGraphRetryOpt) if err != nil { - log.Warn("failed to invoke flushBufferData", zap.Error(err)) metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.FailLabel).Inc() + metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.TotalLabel).Inc() if task.auto { metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.FailLabel).Inc() + metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.TotalLabel).Inc() } - } else { - segmentsToFlush = append(segmentsToFlush, task.segmentID) - ibNode.insertBuffer.Delete(task.segmentID) - metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.SuccessLabel).Inc() - if task.auto { - metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.FailLabel).Inc() - } + err = fmt.Errorf("insertBufferNode flushBufferData failed, err = %s", err) + log.Error(err.Error()) + panic(err) } + segmentsToFlush = append(segmentsToFlush, task.segmentID) + ibNode.insertBuffer.Delete(task.segmentID) + metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.SuccessLabel).Inc() metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.TotalLabel).Inc() if task.auto { metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.TotalLabel).Inc() + metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.FailLabel).Inc() } } - if err := ibNode.writeHardTimeTick(fgMsg.timeRange.timestampMax, seg2Upload); err != nil { - log.Error("send hard time tick into pulsar channel failed", zap.Error(err)) - } + ibNode.writeHardTimeTick(fgMsg.timeRange.timestampMax, seg2Upload) res := flowGraphMsg{ deleteMessages: fgMsg.deleteMessages, @@ -495,11 +511,11 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos } // writeHardTimeTick writes timetick once insertBufferNode operates. -func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp, segmentIDs []int64) error { +func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp, segmentIDs []int64) { ibNode.ttLogger.LogTs(ts) ibNode.ttMerger.bufferTs(ts, segmentIDs) - return nil } + func (ibNode *insertBufferNode) getCollectionandPartitionIDbySegID(segmentID UniqueID) (collID, partitionID UniqueID, err error) { return ibNode.replica.getCollectionAndPartitionID(segmentID) } @@ -558,6 +574,7 @@ func newInsertBufferNode(ctx context.Context, collID UniqueID, flushCh <-chan fl }) return &insertBufferNode{ + ctx: ctx, BaseNode: baseNode, insertBuffer: sync.Map{}, diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 7a497168c5..8571e9c095 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -23,6 +23,10 @@ import ( "testing" "time" + "github.com/milvus-io/milvus/internal/util/retry" + + "github.com/milvus-io/milvus/internal/util/typeutil" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -199,6 +203,34 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { inMsg = genFlowGraphInsertMsg(insertChannelName) inMsg.dropCollection = true assert.NotPanics(t, func() { iBNode.Operate([]flowgraph.Msg{&inMsg}) }) + + // test consume old message + inMsg = genFlowGraphInsertMsg(insertChannelName) + timestampBak := iBNode.lastTimestamp + iBNode.lastTimestamp = typeutil.MaxTimestamp + assert.Panics(t, func() { iBNode.Operate([]flowgraph.Msg{&inMsg}) }) + iBNode.lastTimestamp = timestampBak + + // test updateSegStatesInReplica failed + inMsg = genFlowGraphInsertMsg(insertChannelName) + inMsg.insertMessages[0].CollectionID = UniqueID(-1) + inMsg.insertMessages[0].SegmentID = UniqueID(-1) + assert.NoError(t, err) + assert.Panics(t, func() { iBNode.Operate([]flowgraph.Msg{&inMsg}) }) + + // test bufferInsertMsg failed + inMsg = genFlowGraphInsertMsg(insertChannelName) + inMsg.insertMessages[0].Timestamps = []Timestamp{1, 2} + inMsg.insertMessages[0].RowIDs = []int64{1} + assert.Panics(t, func() { iBNode.Operate([]flowgraph.Msg{&inMsg}) }) + + // test flushBufferData failed + flowGraphRetryOpt = retry.Attempts(1) + inMsg = genFlowGraphInsertMsg(insertChannelName) + iBNode.flushManager = &mockFlushManager{returnError: true} + iBNode.insertBuffer.Store(inMsg.insertMessages[0].SegmentID, &BufferData{}) + assert.Panics(t, func() { iBNode.Operate([]flowgraph.Msg{&inMsg}) }) + iBNode.flushManager = fm } /* @@ -464,7 +496,8 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { for _, im := range fgm.segmentsToFlush { // send del done signal - fm.flushDelData(nil, im, fgm.endPositions[0]) + err = fm.flushDelData(nil, im, fgm.endPositions[0]) + assert.NoError(t, err) } wg.Wait() require.Equal(t, 0, len(colRep.newSegments)) @@ -573,7 +606,8 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { wg.Add(len(fgm.segmentsToFlush)) for _, im := range fgm.segmentsToFlush { // send del done signal - fm.flushDelData(nil, im, fgm.endPositions[0]) + err = fm.flushDelData(nil, im, fgm.endPositions[0]) + assert.NoError(t, err) } wg.Wait() require.Equal(t, 0, len(colRep.newSegments)) diff --git a/internal/datanode/flow_graph_node.go b/internal/datanode/flow_graph_node.go index aa01738084..c1649982ba 100644 --- a/internal/datanode/flow_graph_node.go +++ b/internal/datanode/flow_graph_node.go @@ -16,7 +16,10 @@ package datanode -import "github.com/milvus-io/milvus/internal/util/flowgraph" +import ( + "github.com/milvus-io/milvus/internal/util/flowgraph" + "github.com/milvus-io/milvus/internal/util/retry" +) type ( // Node is flowgraph.Node @@ -28,3 +31,5 @@ type ( // InputNode is flowgraph.InputNode InputNode = flowgraph.InputNode ) + +var flowGraphRetryOpt = retry.Attempts(5) diff --git a/internal/datanode/flow_graph_time_ticker.go b/internal/datanode/flow_graph_time_ticker.go index 495c6bf9af..eb2f2231bd 100644 --- a/internal/datanode/flow_graph_time_ticker.go +++ b/internal/datanode/flow_graph_time_ticker.go @@ -19,6 +19,10 @@ package datanode import ( "sync" "time" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/log" ) type sendTimeTick func(Timestamp, []int64) error @@ -118,7 +122,9 @@ func (mt *mergedTimeTickerSender) work() { lastTs = mt.ts mt.lastSent = time.Now() - mt.send(mt.ts, sids) + if err := mt.send(mt.ts, sids); err != nil { + log.Error("send hard time tick failed", zap.Error(err)) + } } mt.mu.Unlock() } diff --git a/internal/querynode/data_sync_service.go b/internal/querynode/data_sync_service.go index dc317d4bf2..e2c7bcd2fd 100644 --- a/internal/querynode/data_sync_service.go +++ b/internal/querynode/data_sync_service.go @@ -42,11 +42,42 @@ type dataSyncService struct { msFactory msgstream.Factory } +// checkReplica used to check replica info before init flow graph, it's a private method of dataSyncService +func (dsService *dataSyncService) checkReplica(collectionID UniqueID) error { + // check if the collection exists + hisColl, err := dsService.historicalReplica.getCollectionByID(collectionID) + if err != nil { + return err + } + strColl, err := dsService.streamingReplica.getCollectionByID(collectionID) + if err != nil { + return err + } + if hisColl.getLoadType() != strColl.getLoadType() { + return fmt.Errorf("inconsistent loadType of collection, collectionID = %d", collectionID) + } + for _, channel := range hisColl.getVChannels() { + if _, err := dsService.tSafeReplica.getTSafe(channel); err != nil { + return fmt.Errorf("getTSafe failed, err = %s", err) + } + } + for _, channel := range hisColl.getVDeltaChannels() { + if _, err := dsService.tSafeReplica.getTSafe(channel); err != nil { + return fmt.Errorf("getTSafe failed, err = %s", err) + } + } + return nil +} + // addFlowGraphsForDMLChannels add flowGraphs to dmlChannel2FlowGraph func (dsService *dataSyncService) addFlowGraphsForDMLChannels(collectionID UniqueID, dmlChannels []string) (map[string]*queryNodeFlowGraph, error) { dsService.mu.Lock() defer dsService.mu.Unlock() + if err := dsService.checkReplica(collectionID); err != nil { + return nil, err + } + results := make(map[string]*queryNodeFlowGraph) for _, channel := range dmlChannels { if _, ok := dsService.dmlChannel2FlowGraph[channel]; ok { @@ -87,6 +118,10 @@ func (dsService *dataSyncService) addFlowGraphsForDeltaChannels(collectionID Uni dsService.mu.Lock() defer dsService.mu.Unlock() + if err := dsService.checkReplica(collectionID); err != nil { + return nil, err + } + results := make(map[string]*queryNodeFlowGraph) for _, channel := range deltaChannels { if _, ok := dsService.deltaChannel2FlowGraph[channel]; ok { diff --git a/internal/querynode/data_sync_service_test.go b/internal/querynode/data_sync_service_test.go index 91f7e81aac..937650f47a 100644 --- a/internal/querynode/data_sync_service_test.go +++ b/internal/querynode/data_sync_service_test.go @@ -21,6 +21,8 @@ import ( "testing" "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/internal/proto/schemapb" ) func TestDataSyncService_DMLFlowGraphs(t *testing.T) { @@ -40,41 +42,53 @@ func TestDataSyncService_DMLFlowGraphs(t *testing.T) { dataSyncService := newDataSyncService(ctx, streamingReplica, historicalReplica, tSafe, fac) assert.NotNil(t, dataSyncService) - _, err = dataSyncService.addFlowGraphsForDMLChannels(defaultCollectionID, []Channel{defaultDMLChannel}) - assert.NoError(t, err) - assert.Len(t, dataSyncService.dmlChannel2FlowGraph, 1) + t.Run("test DMLFlowGraphs", func(t *testing.T) { + _, err = dataSyncService.addFlowGraphsForDMLChannels(defaultCollectionID, []Channel{defaultDMLChannel}) + assert.NoError(t, err) + assert.Len(t, dataSyncService.dmlChannel2FlowGraph, 1) - _, err = dataSyncService.addFlowGraphsForDMLChannels(defaultCollectionID, []Channel{defaultDMLChannel}) - assert.NoError(t, err) - assert.Len(t, dataSyncService.dmlChannel2FlowGraph, 1) + _, err = dataSyncService.addFlowGraphsForDMLChannels(defaultCollectionID, []Channel{defaultDMLChannel}) + assert.NoError(t, err) + assert.Len(t, dataSyncService.dmlChannel2FlowGraph, 1) - fg, err := dataSyncService.getFlowGraphByDMLChannel(defaultCollectionID, defaultDMLChannel) - assert.NotNil(t, fg) - assert.NoError(t, err) + fg, err := dataSyncService.getFlowGraphByDMLChannel(defaultCollectionID, defaultDMLChannel) + assert.NotNil(t, fg) + assert.NoError(t, err) - fg, err = dataSyncService.getFlowGraphByDMLChannel(defaultCollectionID, "invalid-vChannel") - assert.Nil(t, fg) - assert.Error(t, err) + err = dataSyncService.startFlowGraphByDMLChannel(defaultCollectionID, defaultDMLChannel) + assert.NoError(t, err) - err = dataSyncService.startFlowGraphByDMLChannel(defaultCollectionID, defaultDMLChannel) - assert.NoError(t, err) + dataSyncService.removeFlowGraphsByDMLChannels([]Channel{defaultDMLChannel}) + assert.Len(t, dataSyncService.dmlChannel2FlowGraph, 0) - err = dataSyncService.startFlowGraphByDMLChannel(defaultCollectionID, "invalid-vChannel") - assert.Error(t, err) + fg, err = dataSyncService.getFlowGraphByDMLChannel(defaultCollectionID, defaultDMLChannel) + assert.Nil(t, fg) + assert.Error(t, err) - dataSyncService.removeFlowGraphsByDMLChannels([]Channel{defaultDMLChannel}) - assert.Len(t, dataSyncService.dmlChannel2FlowGraph, 0) + _, err = dataSyncService.addFlowGraphsForDMLChannels(defaultCollectionID, []Channel{defaultDMLChannel}) + assert.NoError(t, err) + assert.Len(t, dataSyncService.dmlChannel2FlowGraph, 1) - fg, err = dataSyncService.getFlowGraphByDMLChannel(defaultCollectionID, defaultDMLChannel) - assert.Nil(t, fg) - assert.Error(t, err) + dataSyncService.close() + assert.Len(t, dataSyncService.dmlChannel2FlowGraph, 0) + }) - _, err = dataSyncService.addFlowGraphsForDMLChannels(defaultCollectionID, []Channel{defaultDMLChannel}) - assert.NoError(t, err) - assert.Len(t, dataSyncService.dmlChannel2FlowGraph, 1) + t.Run("test DMLFlowGraphs invalid channel", func(t *testing.T) { + fg, err := dataSyncService.getFlowGraphByDMLChannel(defaultCollectionID, "invalid-vChannel") + assert.Nil(t, fg) + assert.Error(t, err) - dataSyncService.close() - assert.Len(t, dataSyncService.dmlChannel2FlowGraph, 0) + err = dataSyncService.startFlowGraphByDMLChannel(defaultCollectionID, "invalid-vChannel") + assert.Error(t, err) + }) + + t.Run("test addFlowGraphsForDMLChannels checkReplica Failed", func(t *testing.T) { + err = dataSyncService.historicalReplica.removeCollection(defaultCollectionID) + assert.NoError(t, err) + _, err = dataSyncService.addFlowGraphsForDMLChannels(defaultCollectionID, []Channel{defaultDMLChannel}) + assert.Error(t, err) + dataSyncService.historicalReplica.addCollection(defaultCollectionID, genTestCollectionSchema(schemapb.DataType_Int64)) + }) } func TestDataSyncService_DeltaFlowGraphs(t *testing.T) { @@ -94,39 +108,126 @@ func TestDataSyncService_DeltaFlowGraphs(t *testing.T) { dataSyncService := newDataSyncService(ctx, streamingReplica, historicalReplica, tSafe, fac) assert.NotNil(t, dataSyncService) - _, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDeltaChannel}) - assert.NoError(t, err) - assert.Len(t, dataSyncService.deltaChannel2FlowGraph, 1) + t.Run("test DeltaFlowGraphs", func(t *testing.T) { + _, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDeltaChannel}) + assert.NoError(t, err) + assert.Len(t, dataSyncService.deltaChannel2FlowGraph, 1) - _, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDeltaChannel}) - assert.NoError(t, err) - assert.Len(t, dataSyncService.deltaChannel2FlowGraph, 1) + _, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDeltaChannel}) + assert.NoError(t, err) + assert.Len(t, dataSyncService.deltaChannel2FlowGraph, 1) - fg, err := dataSyncService.getFlowGraphByDeltaChannel(defaultCollectionID, defaultDeltaChannel) - assert.NotNil(t, fg) - assert.NoError(t, err) + fg, err := dataSyncService.getFlowGraphByDeltaChannel(defaultCollectionID, defaultDeltaChannel) + assert.NotNil(t, fg) + assert.NoError(t, err) - fg, err = dataSyncService.getFlowGraphByDeltaChannel(defaultCollectionID, "invalid-vChannel") - assert.Nil(t, fg) - assert.Error(t, err) + err = dataSyncService.startFlowGraphForDeltaChannel(defaultCollectionID, defaultDeltaChannel) + assert.NoError(t, err) - err = dataSyncService.startFlowGraphForDeltaChannel(defaultCollectionID, defaultDeltaChannel) - assert.NoError(t, err) + dataSyncService.removeFlowGraphsByDeltaChannels([]Channel{defaultDeltaChannel}) + assert.Len(t, dataSyncService.deltaChannel2FlowGraph, 0) - err = dataSyncService.startFlowGraphForDeltaChannel(defaultCollectionID, "invalid-vChannel") - assert.Error(t, err) + fg, err = dataSyncService.getFlowGraphByDeltaChannel(defaultCollectionID, defaultDeltaChannel) + assert.Nil(t, fg) + assert.Error(t, err) - dataSyncService.removeFlowGraphsByDeltaChannels([]Channel{defaultDeltaChannel}) - assert.Len(t, dataSyncService.deltaChannel2FlowGraph, 0) + _, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDMLChannel}) + assert.NoError(t, err) + assert.Len(t, dataSyncService.deltaChannel2FlowGraph, 1) - fg, err = dataSyncService.getFlowGraphByDeltaChannel(defaultCollectionID, defaultDeltaChannel) - assert.Nil(t, fg) - assert.Error(t, err) + dataSyncService.close() + assert.Len(t, dataSyncService.deltaChannel2FlowGraph, 0) + }) - _, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDMLChannel}) - assert.NoError(t, err) - assert.Len(t, dataSyncService.deltaChannel2FlowGraph, 1) + t.Run("test DeltaFlowGraphs invalid channel", func(t *testing.T) { + fg, err := dataSyncService.getFlowGraphByDeltaChannel(defaultCollectionID, "invalid-vChannel") + assert.Nil(t, fg) + assert.Error(t, err) - dataSyncService.close() - assert.Len(t, dataSyncService.deltaChannel2FlowGraph, 0) + err = dataSyncService.startFlowGraphForDeltaChannel(defaultCollectionID, "invalid-vChannel") + assert.Error(t, err) + }) + + t.Run("test addFlowGraphsForDeltaChannels checkReplica Failed", func(t *testing.T) { + err = dataSyncService.historicalReplica.removeCollection(defaultCollectionID) + assert.NoError(t, err) + _, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDMLChannel}) + assert.Error(t, err) + dataSyncService.historicalReplica.addCollection(defaultCollectionID, genTestCollectionSchema(schemapb.DataType_Int64)) + }) +} + +func TestDataSyncService_checkReplica(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + streamingReplica, err := genSimpleReplica() + assert.NoError(t, err) + + historicalReplica, err := genSimpleReplica() + assert.NoError(t, err) + + fac := genFactory() + assert.NoError(t, err) + + tSafe := newTSafeReplica() + dataSyncService := newDataSyncService(ctx, streamingReplica, historicalReplica, tSafe, fac) + assert.NotNil(t, dataSyncService) + defer dataSyncService.close() + + t.Run("test checkReplica", func(t *testing.T) { + err = dataSyncService.checkReplica(defaultCollectionID) + assert.NoError(t, err) + }) + + t.Run("test collection doesn't exist", func(t *testing.T) { + err = dataSyncService.streamingReplica.removeCollection(defaultCollectionID) + assert.NoError(t, err) + err = dataSyncService.checkReplica(defaultCollectionID) + assert.Error(t, err) + + err = dataSyncService.historicalReplica.removeCollection(defaultCollectionID) + assert.NoError(t, err) + err = dataSyncService.checkReplica(defaultCollectionID) + assert.Error(t, err) + + coll := dataSyncService.historicalReplica.addCollection(defaultCollectionID, genTestCollectionSchema(schemapb.DataType_Int64)) + assert.NotNil(t, coll) + coll = dataSyncService.streamingReplica.addCollection(defaultCollectionID, genTestCollectionSchema(schemapb.DataType_Int64)) + assert.NotNil(t, coll) + }) + + t.Run("test different loadType", func(t *testing.T) { + coll, err := dataSyncService.historicalReplica.getCollectionByID(defaultCollectionID) + assert.NoError(t, err) + coll.setLoadType(loadTypePartition) + + err = dataSyncService.checkReplica(defaultCollectionID) + assert.Error(t, err) + + coll, err = dataSyncService.streamingReplica.getCollectionByID(defaultCollectionID) + assert.NoError(t, err) + coll.setLoadType(loadTypePartition) + }) + + t.Run("test cannot find tSafe", func(t *testing.T) { + coll, err := dataSyncService.historicalReplica.getCollectionByID(defaultCollectionID) + assert.NoError(t, err) + coll.addVDeltaChannels([]Channel{defaultDeltaChannel}) + coll.addVChannels([]Channel{defaultDMLChannel}) + + dataSyncService.tSafeReplica.addTSafe(defaultDeltaChannel) + dataSyncService.tSafeReplica.addTSafe(defaultDMLChannel) + + dataSyncService.tSafeReplica.removeTSafe(defaultDeltaChannel) + err = dataSyncService.checkReplica(defaultCollectionID) + assert.Error(t, err) + + dataSyncService.tSafeReplica.removeTSafe(defaultDMLChannel) + err = dataSyncService.checkReplica(defaultCollectionID) + assert.Error(t, err) + + dataSyncService.tSafeReplica.addTSafe(defaultDeltaChannel) + dataSyncService.tSafeReplica.addTSafe(defaultDMLChannel) + }) } diff --git a/internal/querynode/flow_graph_delete_node.go b/internal/querynode/flow_graph_delete_node.go index e91cca0dad..ebccbd841e 100644 --- a/internal/querynode/flow_graph_delete_node.go +++ b/internal/querynode/flow_graph_delete_node.go @@ -17,6 +17,7 @@ package querynode import ( + "fmt" "reflect" "sync" @@ -96,7 +97,13 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { ) if dNode.replica.getSegmentNum() != 0 { - processDeleteMessages(dNode.replica, delMsg, delData) + err := processDeleteMessages(dNode.replica, delMsg, delData) + if err != nil { + // error occurs when missing meta info or unexpected pk type, should not happen + err = fmt.Errorf("deleteNode processDeleteMessages failed, collectionID = %d, err = %s", delMsg.CollectionID, err) + log.Error(err.Error()) + panic(err) + } } } @@ -104,8 +111,10 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { for segmentID, pks := range delData.deleteIDs { segment, err := dNode.replica.getSegmentByID(segmentID) if err != nil { - log.Debug("failed to get segment", zap.Int64("segmentId", segmentID), zap.Error(err)) - continue + // should not happen, segment should be created before + err = fmt.Errorf("deleteNode getSegmentByID failed, err = %s", err) + log.Error(err.Error()) + panic(err) } offset := segment.segmentPreDelete(len(pks)) delData.deleteOffset[segmentID] = offset @@ -114,8 +123,17 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { // 3. do delete wg := sync.WaitGroup{} for segmentID := range delData.deleteOffset { + segmentID := segmentID wg.Add(1) - go dNode.delete(delData, segmentID, &wg) + go func() { + err := dNode.delete(delData, segmentID, &wg) + if err != nil { + // error occurs when segment cannot be found, calling cgo function delete failed and etc... + err = fmt.Errorf("segment delete failed, segmentID = %d, err = %s", segmentID, err) + log.Error(err.Error()) + panic(err) + } + }() } wg.Wait() @@ -130,16 +148,15 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { } // delete will do delete operation at segment which id is segmentID -func (dNode *deleteNode) delete(deleteData *deleteData, segmentID UniqueID, wg *sync.WaitGroup) { +func (dNode *deleteNode) delete(deleteData *deleteData, segmentID UniqueID, wg *sync.WaitGroup) error { defer wg.Done() targetSegment, err := dNode.replica.getSegmentByID(segmentID) if err != nil { - log.Error(err.Error()) - return + return fmt.Errorf("getSegmentByID failed, err = %s", err) } if targetSegment.segmentType != segmentTypeSealed { - return + return fmt.Errorf("unexpected segmentType when delete, segmentID = %d, segmentType = %s", segmentID, targetSegment.segmentType.String()) } ids := deleteData.deleteIDs[segmentID] @@ -148,11 +165,11 @@ func (dNode *deleteNode) delete(deleteData *deleteData, segmentID UniqueID, wg * err = targetSegment.segmentDelete(offset, ids, timestamps) if err != nil { - log.Warn("delete segment data failed", zap.Int64("segmentID", segmentID), zap.Error(err)) - return + return fmt.Errorf("segmentDelete failed, segmentID = %d", segmentID) } log.Debug("Do delete done", zap.Int("len", len(deleteData.deleteIDs[segmentID])), zap.Int64("segmentID", segmentID), zap.Any("SegmentType", targetSegment.segmentType)) + return nil } // newDeleteNode returns a new deleteNode diff --git a/internal/querynode/flow_graph_delete_node_test.go b/internal/querynode/flow_graph_delete_node_test.go index 8ec2f9f6b8..57187101b8 100644 --- a/internal/querynode/flow_graph_delete_node_test.go +++ b/internal/querynode/flow_graph_delete_node_test.go @@ -46,7 +46,8 @@ func TestFlowGraphDeleteNode_delete(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) - deleteNode.delete(deleteData, defaultSegmentID, wg) + err = deleteNode.delete(deleteData, defaultSegmentID, wg) + assert.NoError(t, err) }) t.Run("test segment delete error", func(t *testing.T) { @@ -67,7 +68,8 @@ func TestFlowGraphDeleteNode_delete(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) deleteData.deleteTimestamps[defaultSegmentID] = deleteData.deleteTimestamps[defaultSegmentID][:len(deleteData.deleteTimestamps)/2] - deleteNode.delete(deleteData, defaultSegmentID, wg) + err = deleteNode.delete(deleteData, defaultSegmentID, wg) + assert.Error(t, err) }) t.Run("test no target segment", func(t *testing.T) { @@ -76,7 +78,8 @@ func TestFlowGraphDeleteNode_delete(t *testing.T) { deleteNode := newDeleteNode(historical) wg := &sync.WaitGroup{} wg.Add(1) - deleteNode.delete(nil, defaultSegmentID, wg) + err = deleteNode.delete(nil, defaultSegmentID, wg) + assert.Error(t, err) }) t.Run("test invalid segmentType", func(t *testing.T) { @@ -93,7 +96,8 @@ func TestFlowGraphDeleteNode_delete(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) - deleteNode.delete(&deleteData{}, defaultSegmentID, wg) + err = deleteNode.delete(&deleteData{}, defaultSegmentID, wg) + assert.Error(t, err) }) } @@ -178,7 +182,9 @@ func TestFlowGraphDeleteNode_operate(t *testing.T) { }, } msg := []flowgraph.Msg{&dMsg} - deleteNode.Operate(msg) + assert.Panics(t, func() { + deleteNode.Operate(msg) + }) }) t.Run("test partition not exist", func(t *testing.T) { @@ -202,7 +208,9 @@ func TestFlowGraphDeleteNode_operate(t *testing.T) { }, } msg := []flowgraph.Msg{&dMsg} - deleteNode.Operate(msg) + assert.Panics(t, func() { + deleteNode.Operate(msg) + }) }) t.Run("test invalid input length", func(t *testing.T) { diff --git a/internal/querynode/flow_graph_filter_delete_node.go b/internal/querynode/flow_graph_filter_delete_node.go index 5683c78c75..779b867f36 100644 --- a/internal/querynode/flow_graph_filter_delete_node.go +++ b/internal/querynode/flow_graph_filter_delete_node.go @@ -81,12 +81,18 @@ func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { for _, msg := range msgStreamMsg.TsMessages() { switch msg.Type() { case commonpb.MsgType_Delete: - resMsg := fddNode.filterInvalidDeleteMessage(msg.(*msgstream.DeleteMsg)) + resMsg, err := fddNode.filterInvalidDeleteMessage(msg.(*msgstream.DeleteMsg)) + if err != nil { + // error occurs when missing meta info or data is misaligned, should not happen + err = fmt.Errorf("filterInvalidDeleteMessage failed, err = %s", err) + log.Error(err.Error()) + panic(err) + } if resMsg != nil { dMsg.deleteMessages = append(dMsg.deleteMessages, resMsg) } default: - log.Warn("Non supporting", zap.Int32("message type", int32(msg.Type()))) + log.Warn("invalid message type in filterDeleteNode", zap.String("message type", msg.Type().String())) } } var res Msg = &dMsg @@ -97,11 +103,9 @@ func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { } // filterInvalidDeleteMessage would filter invalid delete messages -func (fddNode *filterDeleteNode) filterInvalidDeleteMessage(msg *msgstream.DeleteMsg) *msgstream.DeleteMsg { +func (fddNode *filterDeleteNode) filterInvalidDeleteMessage(msg *msgstream.DeleteMsg) (*msgstream.DeleteMsg, error) { if err := msg.CheckAligned(); err != nil { - // TODO: what if the messages are misaligned? Here, we ignore those messages and print error - log.Warn("misaligned delete messages detected", zap.Error(err)) - return nil + return nil, fmt.Errorf("CheckAligned failed, err = %s", err) } sp, ctx := trace.StartSpanFromContext(msg.TraceCtx()) @@ -109,16 +113,29 @@ func (fddNode *filterDeleteNode) filterInvalidDeleteMessage(msg *msgstream.Delet defer sp.Finish() if msg.CollectionID != fddNode.collectionID { - return nil + return nil, nil } if len(msg.Timestamps) <= 0 { log.Debug("filter invalid delete message, no message", zap.Any("collectionID", msg.CollectionID), zap.Any("partitionID", msg.PartitionID)) - return nil + return nil, nil } - return msg + + // check if collection exists + col, err := fddNode.replica.getCollectionByID(msg.CollectionID) + if err != nil { + // QueryNode should add collection before start flow graph + return nil, fmt.Errorf("filter invalid delete message, collection does not exist, collectionID = %d", msg.CollectionID) + } + if col.getLoadType() == loadTypePartition { + if !fddNode.replica.hasPartition(msg.PartitionID) { + // filter out msg which not belongs to the loaded partitions + return nil, nil + } + } + return msg, nil } // newFilteredDeleteNode returns a new filterDeleteNode diff --git a/internal/querynode/flow_graph_filter_delete_node_test.go b/internal/querynode/flow_graph_filter_delete_node_test.go index 285d383266..4a0aecc681 100644 --- a/internal/querynode/flow_graph_filter_delete_node_test.go +++ b/internal/querynode/flow_graph_filter_delete_node_test.go @@ -17,7 +17,6 @@ package querynode import ( - "context" "testing" "github.com/stretchr/testify/assert" @@ -28,7 +27,7 @@ import ( "github.com/milvus-io/milvus/internal/util/flowgraph" ) -func getFilterDeleteNode(ctx context.Context) (*filterDeleteNode, error) { +func getFilterDeleteNode() (*filterDeleteNode, error) { historical, err := genSimpleReplica() if err != nil { return nil, err @@ -39,61 +38,77 @@ func getFilterDeleteNode(ctx context.Context) (*filterDeleteNode, error) { } func TestFlowGraphFilterDeleteNode_filterDeleteNode(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - fg, err := getFilterDeleteNode(ctx) + fg, err := getFilterDeleteNode() assert.NoError(t, err) fg.Name() } func TestFlowGraphFilterDeleteNode_filterInvalidDeleteMessage(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - t.Run("delete valid test", func(t *testing.T) { msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) - fg, err := getFilterDeleteNode(ctx) + fg, err := getFilterDeleteNode() + assert.NoError(t, err) + res, err := fg.filterInvalidDeleteMessage(msg) assert.NoError(t, err) - res := fg.filterInvalidDeleteMessage(msg) assert.NotNil(t, res) }) t.Run("test delete no collection", func(t *testing.T) { msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) msg.CollectionID = UniqueID(1003) - fg, err := getFilterDeleteNode(ctx) + fg, err := getFilterDeleteNode() assert.NoError(t, err) - res := fg.filterInvalidDeleteMessage(msg) + fg.collectionID = UniqueID(1003) + res, err := fg.filterInvalidDeleteMessage(msg) + assert.Error(t, err) assert.Nil(t, res) + fg.collectionID = defaultCollectionID }) t.Run("test delete not target collection", func(t *testing.T) { msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) - fg, err := getFilterDeleteNode(ctx) + fg, err := getFilterDeleteNode() assert.NoError(t, err) fg.collectionID = UniqueID(1000) - res := fg.filterInvalidDeleteMessage(msg) + res, err := fg.filterInvalidDeleteMessage(msg) + assert.NoError(t, err) assert.Nil(t, res) }) t.Run("test delete no data", func(t *testing.T) { msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) - fg, err := getFilterDeleteNode(ctx) + fg, err := getFilterDeleteNode() assert.NoError(t, err) msg.Timestamps = make([]Timestamp, 0) msg.Int64PrimaryKeys = make([]IntPrimaryKey, 0) - res := fg.filterInvalidDeleteMessage(msg) + msg.PrimaryKeys = &schemapb.IDs{} + msg.NumRows = 0 + res, err := fg.filterInvalidDeleteMessage(msg) + assert.NoError(t, err) assert.Nil(t, res) msg.PrimaryKeys = storage.ParsePrimaryKeys2IDs([]primaryKey{}) - res = fg.filterInvalidDeleteMessage(msg) + res, err = fg.filterInvalidDeleteMessage(msg) + assert.NoError(t, err) + assert.Nil(t, res) + }) + + t.Run("test not target partition", func(t *testing.T) { + msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) + fg, err := getFilterDeleteNode() + assert.NoError(t, err) + col, err := fg.replica.getCollectionByID(defaultCollectionID) + assert.NoError(t, err) + col.setLoadType(loadTypePartition) + err = fg.replica.removePartition(defaultPartitionID) + assert.NoError(t, err) + + res, err := fg.filterInvalidDeleteMessage(msg) + assert.NoError(t, err) assert.Nil(t, res) }) } func TestFlowGraphFilterDeleteNode_Operate(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - genFilterDeleteMsg := func() []flowgraph.Msg { dMsg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) msg := flowgraph.GenerateMsgStreamMsg([]msgstream.TsMsg{dMsg}, 0, 1000, nil, nil) @@ -102,7 +117,7 @@ func TestFlowGraphFilterDeleteNode_Operate(t *testing.T) { t.Run("valid test", func(t *testing.T) { msg := genFilterDeleteMsg() - fg, err := getFilterDeleteNode(ctx) + fg, err := getFilterDeleteNode() assert.NoError(t, err) res := fg.Operate(msg) assert.NotNil(t, res) @@ -110,11 +125,34 @@ func TestFlowGraphFilterDeleteNode_Operate(t *testing.T) { t.Run("invalid input length", func(t *testing.T) { msg := genFilterDeleteMsg() - fg, err := getFilterDeleteNode(ctx) + fg, err := getFilterDeleteNode() assert.NoError(t, err) var m flowgraph.Msg msg = append(msg, m) res := fg.Operate(msg) assert.NotNil(t, res) }) + + t.Run("filterInvalidDeleteMessage failed", func(t *testing.T) { + dMsg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) + dMsg.NumRows = 0 + msg := flowgraph.GenerateMsgStreamMsg([]msgstream.TsMsg{dMsg}, 0, 1000, nil, nil) + fg, err := getFilterDeleteNode() + assert.NoError(t, err) + m := []flowgraph.Msg{msg} + assert.Panics(t, func() { + fg.Operate(m) + }) + }) + + t.Run("invalid msgType", func(t *testing.T) { + iMsg, err := genSimpleInsertMsg(genTestCollectionSchema(schemapb.DataType_Int64), defaultDelLength) + assert.NoError(t, err) + msg := flowgraph.GenerateMsgStreamMsg([]msgstream.TsMsg{iMsg}, 0, 1000, nil, nil) + + fg, err := getFilterDeleteNode() + assert.NoError(t, err) + res := fg.Operate([]flowgraph.Msg{msg}) + assert.NotNil(t, res) + }) } diff --git a/internal/querynode/flow_graph_filter_dm_node.go b/internal/querynode/flow_graph_filter_dm_node.go index 60798fbfa9..4564f73619 100644 --- a/internal/querynode/flow_graph_filter_dm_node.go +++ b/internal/querynode/flow_graph_filter_dm_node.go @@ -84,17 +84,29 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { log.Debug("Filter invalid message in QueryNode", zap.String("traceID", traceID)) switch msg.Type() { case commonpb.MsgType_Insert: - resMsg := fdmNode.filterInvalidInsertMessage(msg.(*msgstream.InsertMsg)) + resMsg, err := fdmNode.filterInvalidInsertMessage(msg.(*msgstream.InsertMsg)) + if err != nil { + // error occurs when missing meta info or data is misaligned, should not happen + err = fmt.Errorf("filterInvalidInsertMessage failed, err = %s", err) + log.Error(err.Error()) + panic(err) + } if resMsg != nil { iMsg.insertMessages = append(iMsg.insertMessages, resMsg) } case commonpb.MsgType_Delete: - resMsg := fdmNode.filterInvalidDeleteMessage(msg.(*msgstream.DeleteMsg)) + resMsg, err := fdmNode.filterInvalidDeleteMessage(msg.(*msgstream.DeleteMsg)) + if err != nil { + // error occurs when missing meta info or data is misaligned, should not happen + err = fmt.Errorf("filterInvalidDeleteMessage failed, err = %s", err) + log.Error(err.Error()) + panic(err) + } if resMsg != nil { iMsg.deleteMessages = append(iMsg.deleteMessages, resMsg) } default: - log.Warn("Non supporting", zap.Int32("message type", int32(msg.Type()))) + log.Warn("invalid message type in filterDmNode", zap.String("message type", msg.Type().String())) } } @@ -106,11 +118,16 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { } // filterInvalidDeleteMessage would filter out invalid delete messages -func (fdmNode *filterDmNode) filterInvalidDeleteMessage(msg *msgstream.DeleteMsg) *msgstream.DeleteMsg { +func (fdmNode *filterDmNode) filterInvalidDeleteMessage(msg *msgstream.DeleteMsg) (*msgstream.DeleteMsg, error) { if err := msg.CheckAligned(); err != nil { - // TODO: what if the messages are misaligned? Here, we ignore those messages and print error - log.Warn("misaligned delete messages detected", zap.Error(err)) - return nil + return nil, fmt.Errorf("CheckAligned failed, err = %s", err) + } + + if len(msg.Timestamps) <= 0 { + log.Debug("filter invalid delete message, no message", + zap.Any("collectionID", msg.CollectionID), + zap.Any("partitionID", msg.PartitionID)) + return nil, nil } sp, ctx := trace.StartSpanFromContext(msg.TraceCtx()) @@ -118,41 +135,37 @@ func (fdmNode *filterDmNode) filterInvalidDeleteMessage(msg *msgstream.DeleteMsg defer sp.Finish() if msg.CollectionID != fdmNode.collectionID { - return nil + // filter out msg which not belongs to the current collection + return nil, nil } - // check if collection and partition exist + // check if collection exist col, err := fdmNode.replica.getCollectionByID(msg.CollectionID) if err != nil { - log.Debug("filter invalid delete message, collection does not exist", - zap.Any("collectionID", msg.CollectionID), - zap.Any("partitionID", msg.PartitionID)) - return nil + // QueryNode should add collection before start flow graph + return nil, fmt.Errorf("filter invalid delete message, collection does not exist, collectionID = %d", msg.CollectionID) } if col.getLoadType() == loadTypePartition { if !fdmNode.replica.hasPartition(msg.PartitionID) { - log.Debug("filter invalid delete message, partition does not exist", - zap.Any("collectionID", msg.CollectionID), - zap.Any("partitionID", msg.PartitionID)) - return nil + // filter out msg which not belongs to the loaded partitions + return nil, nil } } - if len(msg.Timestamps) <= 0 { - log.Debug("filter invalid delete message, no message", - zap.Any("collectionID", msg.CollectionID), - zap.Any("partitionID", msg.PartitionID)) - return nil - } - return msg + return msg, nil } // filterInvalidInsertMessage would filter out invalid insert messages -func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg) *msgstream.InsertMsg { +func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg) (*msgstream.InsertMsg, error) { if err := msg.CheckAligned(); err != nil { - // TODO: what if the messages are misaligned? Here, we ignore those messages and print error - log.Warn("Error, misaligned insert messages detected") - return nil + return nil, fmt.Errorf("CheckAligned failed, err = %s", err) + } + + if len(msg.Timestamps) <= 0 { + log.Debug("filter invalid insert message, no message", + zap.Any("collectionID", msg.CollectionID), + zap.Any("partitionID", msg.PartitionID)) + return nil, nil } sp, ctx := trace.StartSpanFromContext(msg.TraceCtx()) @@ -164,23 +177,19 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg //log.Debug("filter invalid insert message, collection is not the target collection", // zap.Any("collectionID", msg.CollectionID), // zap.Any("partitionID", msg.PartitionID)) - return nil + return nil, nil } - // check if collection and partition exist + // check if collection exists col, err := fdmNode.replica.getCollectionByID(msg.CollectionID) if err != nil { - log.Debug("filter invalid insert message, collection does not exist", - zap.Any("collectionID", msg.CollectionID), - zap.Any("partitionID", msg.PartitionID)) - return nil + // QueryNode should add collection before start flow graph + return nil, fmt.Errorf("filter invalid insert message, collection does not exist, collectionID = %d", msg.CollectionID) } if col.getLoadType() == loadTypePartition { if !fdmNode.replica.hasPartition(msg.PartitionID) { - log.Debug("filter invalid insert message, partition does not exist", - zap.Any("collectionID", msg.CollectionID), - zap.Any("partitionID", msg.PartitionID)) - return nil + // filter out msg which not belongs to the loaded partitions + return nil, nil } } @@ -189,8 +198,8 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg // so we need to compare the endTimestamp of received messages and position's timestamp. excludedSegments, err := fdmNode.replica.getExcludedSegments(fdmNode.collectionID) if err != nil { - log.Warn(err.Error()) - return nil + // QueryNode should addExcludedSegments for the current collection before start flow graph + return nil, err } for _, segmentInfo := range excludedSegments { // unFlushed segment may not have checkPoint, so `segmentInfo.DmlPosition` may be nil @@ -198,24 +207,17 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg log.Warn("filter unFlushed segment without checkPoint", zap.Any("collectionID", msg.CollectionID), zap.Any("partitionID", msg.PartitionID)) - return nil + continue } if msg.SegmentID == segmentInfo.ID && msg.EndTs() < segmentInfo.DmlPosition.Timestamp { log.Debug("filter invalid insert message, segments are excluded segments", zap.Any("collectionID", msg.CollectionID), zap.Any("partitionID", msg.PartitionID)) - return nil + return nil, nil } } - if len(msg.Timestamps) <= 0 { - log.Debug("filter invalid insert message, no message", - zap.Any("collectionID", msg.CollectionID), - zap.Any("partitionID", msg.PartitionID)) - return nil - } - - return msg + return msg, nil } // newFilteredDmNode returns a new filterDmNode diff --git a/internal/querynode/flow_graph_filter_dm_node_test.go b/internal/querynode/flow_graph_filter_dm_node_test.go index 1f883b58cc..b3ff39ae74 100644 --- a/internal/querynode/flow_graph_filter_dm_node_test.go +++ b/internal/querynode/flow_graph_filter_dm_node_test.go @@ -17,7 +17,6 @@ package querynode import ( - "context" "testing" "github.com/stretchr/testify/assert" @@ -31,7 +30,7 @@ import ( "github.com/milvus-io/milvus/internal/util/flowgraph" ) -func getFilterDMNode(ctx context.Context) (*filterDmNode, error) { +func getFilterDMNode() (*filterDmNode, error) { streaming, err := genSimpleReplica() if err != nil { return nil, err @@ -42,25 +41,21 @@ func getFilterDMNode(ctx context.Context) (*filterDmNode, error) { } func TestFlowGraphFilterDmNode_filterDmNode(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - fg, err := getFilterDMNode(ctx) + fg, err := getFilterDMNode() assert.NoError(t, err) fg.Name() } func TestFlowGraphFilterDmNode_filterInvalidInsertMessage(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - pkType := schemapb.DataType_Int64 schema := genTestCollectionSchema(pkType) t.Run("valid test", func(t *testing.T) { msg, err := genSimpleInsertMsg(schema, defaultMsgLength) assert.NoError(t, err) - fg, err := getFilterDMNode(ctx) + fg, err := getFilterDMNode() + assert.NoError(t, err) + res, err := fg.filterInvalidInsertMessage(msg) assert.NoError(t, err) - res := fg.filterInvalidInsertMessage(msg) assert.NotNil(t, res) }) @@ -68,51 +63,57 @@ func TestFlowGraphFilterDmNode_filterInvalidInsertMessage(t *testing.T) { msg, err := genSimpleInsertMsg(schema, defaultMsgLength) assert.NoError(t, err) msg.CollectionID = UniqueID(1000) - fg, err := getFilterDMNode(ctx) + fg, err := getFilterDMNode() assert.NoError(t, err) - res := fg.filterInvalidInsertMessage(msg) + fg.collectionID = UniqueID(1000) + res, err := fg.filterInvalidInsertMessage(msg) + assert.Error(t, err) assert.Nil(t, res) + fg.collectionID = defaultCollectionID }) t.Run("test no partition", func(t *testing.T) { msg, err := genSimpleInsertMsg(schema, defaultMsgLength) assert.NoError(t, err) msg.PartitionID = UniqueID(1000) - fg, err := getFilterDMNode(ctx) + fg, err := getFilterDMNode() assert.NoError(t, err) col, err := fg.replica.getCollectionByID(defaultCollectionID) assert.NoError(t, err) col.setLoadType(loadTypePartition) - res := fg.filterInvalidInsertMessage(msg) + res, err := fg.filterInvalidInsertMessage(msg) + assert.NoError(t, err) assert.Nil(t, res) }) t.Run("test not target collection", func(t *testing.T) { msg, err := genSimpleInsertMsg(schema, defaultMsgLength) assert.NoError(t, err) - fg, err := getFilterDMNode(ctx) + fg, err := getFilterDMNode() assert.NoError(t, err) fg.collectionID = UniqueID(1000) - res := fg.filterInvalidInsertMessage(msg) + res, err := fg.filterInvalidInsertMessage(msg) + assert.NoError(t, err) assert.Nil(t, res) }) t.Run("test no exclude segment", func(t *testing.T) { msg, err := genSimpleInsertMsg(schema, defaultMsgLength) assert.NoError(t, err) - fg, err := getFilterDMNode(ctx) + fg, err := getFilterDMNode() assert.NoError(t, err) fg.replica.removeExcludedSegments(defaultCollectionID) - res := fg.filterInvalidInsertMessage(msg) + res, err := fg.filterInvalidInsertMessage(msg) + assert.Error(t, err) assert.Nil(t, res) }) t.Run("test segment is exclude segment", func(t *testing.T) { msg, err := genSimpleInsertMsg(schema, defaultMsgLength) assert.NoError(t, err) - fg, err := getFilterDMNode(ctx) + fg, err := getFilterDMNode() assert.NoError(t, err) fg.replica.addExcludedSegments(defaultCollectionID, []*datapb.SegmentInfo{ { @@ -124,104 +125,114 @@ func TestFlowGraphFilterDmNode_filterInvalidInsertMessage(t *testing.T) { }, }, }) - res := fg.filterInvalidInsertMessage(msg) + res, err := fg.filterInvalidInsertMessage(msg) + assert.NoError(t, err) assert.Nil(t, res) }) t.Run("test misaligned messages", func(t *testing.T) { msg, err := genSimpleInsertMsg(schema, defaultMsgLength) assert.NoError(t, err) - fg, err := getFilterDMNode(ctx) + fg, err := getFilterDMNode() assert.NoError(t, err) msg.Timestamps = make([]Timestamp, 0) - res := fg.filterInvalidInsertMessage(msg) + res, err := fg.filterInvalidInsertMessage(msg) + assert.Error(t, err) assert.Nil(t, res) }) t.Run("test no data", func(t *testing.T) { msg, err := genSimpleInsertMsg(schema, defaultMsgLength) assert.NoError(t, err) - fg, err := getFilterDMNode(ctx) + fg, err := getFilterDMNode() assert.NoError(t, err) msg.Timestamps = make([]Timestamp, 0) msg.RowIDs = make([]IntPrimaryKey, 0) msg.RowData = make([]*commonpb.Blob, 0) - res := fg.filterInvalidInsertMessage(msg) + msg.NumRows = 0 + msg.FieldsData = nil + res, err := fg.filterInvalidInsertMessage(msg) + assert.NoError(t, err) assert.Nil(t, res) }) } func TestFlowGraphFilterDmNode_filterInvalidDeleteMessage(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - t.Run("delete valid test", func(t *testing.T) { msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) - fg, err := getFilterDMNode(ctx) + fg, err := getFilterDMNode() + assert.NoError(t, err) + res, err := fg.filterInvalidDeleteMessage(msg) assert.NoError(t, err) - res := fg.filterInvalidDeleteMessage(msg) assert.NotNil(t, res) }) t.Run("test delete no collection", func(t *testing.T) { msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) msg.CollectionID = UniqueID(1003) - fg, err := getFilterDMNode(ctx) + fg, err := getFilterDMNode() assert.NoError(t, err) - res := fg.filterInvalidDeleteMessage(msg) + fg.collectionID = UniqueID(1003) + res, err := fg.filterInvalidDeleteMessage(msg) + assert.Error(t, err) assert.Nil(t, res) + fg.collectionID = defaultCollectionID }) t.Run("test delete no partition", func(t *testing.T) { msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) msg.PartitionID = UniqueID(1000) - fg, err := getFilterDMNode(ctx) + fg, err := getFilterDMNode() assert.NoError(t, err) col, err := fg.replica.getCollectionByID(defaultCollectionID) assert.NoError(t, err) col.setLoadType(loadTypePartition) - res := fg.filterInvalidDeleteMessage(msg) + res, err := fg.filterInvalidDeleteMessage(msg) + assert.NoError(t, err) assert.Nil(t, res) }) t.Run("test delete not target collection", func(t *testing.T) { msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) - fg, err := getFilterDMNode(ctx) + fg, err := getFilterDMNode() assert.NoError(t, err) fg.collectionID = UniqueID(1000) - res := fg.filterInvalidDeleteMessage(msg) + res, err := fg.filterInvalidDeleteMessage(msg) + assert.NoError(t, err) assert.Nil(t, res) }) t.Run("test delete misaligned messages", func(t *testing.T) { msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) - fg, err := getFilterDMNode(ctx) + fg, err := getFilterDMNode() assert.NoError(t, err) msg.Timestamps = make([]Timestamp, 0) - res := fg.filterInvalidDeleteMessage(msg) + res, err := fg.filterInvalidDeleteMessage(msg) + assert.Error(t, err) assert.Nil(t, res) }) t.Run("test delete no data", func(t *testing.T) { msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) - fg, err := getFilterDMNode(ctx) + fg, err := getFilterDMNode() assert.NoError(t, err) msg.Timestamps = make([]Timestamp, 0) msg.NumRows = 0 msg.Int64PrimaryKeys = make([]IntPrimaryKey, 0) - res := fg.filterInvalidDeleteMessage(msg) + msg.PrimaryKeys = &schemapb.IDs{} + res, err := fg.filterInvalidDeleteMessage(msg) + assert.NoError(t, err) assert.Nil(t, res) msg.PrimaryKeys = storage.ParsePrimaryKeys2IDs([]primaryKey{}) - res = fg.filterInvalidDeleteMessage(msg) + res, err = fg.filterInvalidDeleteMessage(msg) + assert.NoError(t, err) assert.Nil(t, res) }) } func TestFlowGraphFilterDmNode_Operate(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() pkType := schemapb.DataType_Int64 schema := genTestCollectionSchema(pkType) @@ -234,7 +245,7 @@ func TestFlowGraphFilterDmNode_Operate(t *testing.T) { t.Run("valid test", func(t *testing.T) { msg := genFilterDMMsg() - fg, err := getFilterDMNode(ctx) + fg, err := getFilterDMNode() assert.NoError(t, err) res := fg.Operate(msg) assert.NotNil(t, res) @@ -242,11 +253,48 @@ func TestFlowGraphFilterDmNode_Operate(t *testing.T) { t.Run("invalid input length", func(t *testing.T) { msg := genFilterDMMsg() - fg, err := getFilterDMNode(ctx) + fg, err := getFilterDMNode() assert.NoError(t, err) var m flowgraph.Msg msg = append(msg, m) res := fg.Operate(msg) assert.NotNil(t, res) }) + + t.Run("filterInvalidInsertMessage failed", func(t *testing.T) { + iMsg, err := genSimpleInsertMsg(schema, defaultDelLength) + assert.NoError(t, err) + iMsg.NumRows = 0 + msg := flowgraph.GenerateMsgStreamMsg([]msgstream.TsMsg{iMsg}, 0, 1000, nil, nil) + fg, err := getFilterDMNode() + assert.NoError(t, err) + m := []flowgraph.Msg{msg} + assert.Panics(t, func() { + fg.Operate(m) + }) + }) + + t.Run("filterInvalidDeleteMessage failed", func(t *testing.T) { + dMsg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) + dMsg.NumRows = 0 + msg := flowgraph.GenerateMsgStreamMsg([]msgstream.TsMsg{dMsg}, 0, 1000, nil, nil) + fg, err := getFilterDMNode() + assert.NoError(t, err) + m := []flowgraph.Msg{msg} + assert.Panics(t, func() { + fg.Operate(m) + }) + }) + + t.Run("invalid msgType", func(t *testing.T) { + iMsg, err := genSimpleInsertMsg(genTestCollectionSchema(schemapb.DataType_Int64), defaultDelLength) + assert.NoError(t, err) + iMsg.Base.MsgType = commonpb.MsgType_Search + msg := flowgraph.GenerateMsgStreamMsg([]msgstream.TsMsg{iMsg}, 0, 1000, nil, nil) + + fg, err := getFilterDMNode() + assert.NoError(t, err) + res := fg.Operate([]flowgraph.Msg{msg}) + assert.NotNil(t, res) + }) } diff --git a/internal/querynode/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go index 0114794dfc..c1a26b5156 100644 --- a/internal/querynode/flow_graph_insert_node.go +++ b/internal/querynode/flow_graph_insert_node.go @@ -113,14 +113,18 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { // if loadType is loadCollection, check if partition exists, if not, create partition col, err := iNode.streamingReplica.getCollectionByID(insertMsg.CollectionID) if err != nil { - log.Warn("failed to get collection", zap.Error(err)) - continue + // should not happen, QueryNode should create collection before start flow graph + err = fmt.Errorf("insertNode getCollectionByID failed, err = %s", err) + log.Error(err.Error()) + panic(err) } if col.getLoadType() == loadTypeCollection { err = iNode.streamingReplica.addPartition(insertMsg.CollectionID, insertMsg.PartitionID) if err != nil { - log.Warn("failed to add partition", zap.Error(err)) - continue + // error occurs only when collection cannot be found, should not happen + err = fmt.Errorf("insertNode addPartition failed, err = %s", err) + log.Error(err.Error()) + panic(err) } } @@ -128,15 +132,19 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { if !iNode.streamingReplica.hasSegment(insertMsg.SegmentID) { err := iNode.streamingReplica.addSegment(insertMsg.SegmentID, insertMsg.PartitionID, insertMsg.CollectionID, insertMsg.ShardName, segmentTypeGrowing) if err != nil { - log.Warn("failed to add segment", zap.Error(err)) - continue + // error occurs when collection or partition cannot be found, collection and partition should be created before + err = fmt.Errorf("insertNode addSegment failed, err = %s", err) + log.Error(err.Error()) + panic(err) } } insertRecord, err := storage.TransferInsertMsgToInsertRecord(col.schema, insertMsg) if err != nil { - log.Warn("failed to transfer msgStream.insertMsg to segcorepb.InsertRecord", zap.Error(err)) - return []Msg{} + // occurs only when schema doesn't have dim param, this should not happen + err = fmt.Errorf("failed to transfer msgStream.insertMsg to storage.InsertRecord, err = %s", err) + log.Error(err.Error()) + panic(err) } iData.insertIDs[insertMsg.SegmentID] = append(iData.insertIDs[insertMsg.SegmentID], insertMsg.RowIDs...) @@ -148,8 +156,10 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { } pks, err := getPrimaryKeys(insertMsg, iNode.streamingReplica) if err != nil { - log.Warn("failed to get primary keys", zap.Error(err)) - continue + // error occurs when cannot find collection or data is misaligned, should not happen + err = fmt.Errorf("failed to get primary keys, err = %d", err) + log.Error(err.Error()) + panic(err) } iData.insertPKs[insertMsg.SegmentID] = append(iData.insertPKs[insertMsg.SegmentID], pks...) } @@ -158,16 +168,20 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { for segmentID := range iData.insertRecords { var targetSegment, err = iNode.streamingReplica.getSegmentByID(segmentID) if err != nil { - log.Warn(err.Error()) - continue + // should not happen, segment should be created before + err = fmt.Errorf("insertNode getSegmentByID failed, err = %s", err) + log.Error(err.Error()) + panic(err) } var numOfRecords = len(iData.insertIDs[segmentID]) if targetSegment != nil { offset, err := targetSegment.segmentPreInsert(numOfRecords) if err != nil { - log.Warn(err.Error()) - continue + // error occurs when cgo function `PreInsert` failed + err = fmt.Errorf("segmentPreInsert failed, segmentID = %d, err = %s", segmentID, err) + log.Error(err.Error()) + panic(err) } iData.insertOffset[segmentID] = offset log.Debug("insertNode operator", zap.Int("insert size", numOfRecords), zap.Int64("insert offset", offset), zap.Int64("segment id", segmentID)) @@ -178,8 +192,17 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { // 3. do insert wg := sync.WaitGroup{} for segmentID := range iData.insertRecords { + segmentID := segmentID wg.Add(1) - go iNode.insert(&iData, segmentID, &wg) + go func() { + err := iNode.insert(&iData, segmentID, &wg) + if err != nil { + // error occurs when segment cannot be found or cgo function `Insert` failed + err = fmt.Errorf("segment insert failed, segmentID = %d, err = %s", segmentID, err) + log.Error(err.Error()) + panic(err) + } + }() } wg.Wait() @@ -196,7 +219,13 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { zap.Any("collectionName", delMsg.CollectionName), zap.Int64("numPKs", delMsg.NumRows), zap.Any("timestamp", delMsg.Timestamps)) - processDeleteMessages(iNode.streamingReplica, delMsg, delData) + err := processDeleteMessages(iNode.streamingReplica, delMsg, delData) + if err != nil { + // error occurs when missing meta info or unexpected pk type, should not happen + err = fmt.Errorf("insertNode processDeleteMessages failed, collectionID = %d, err = %s", delMsg.CollectionID, err) + log.Error(err.Error()) + panic(err) + } } } @@ -204,8 +233,10 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { for segmentID, pks := range delData.deleteIDs { segment, err := iNode.streamingReplica.getSegmentByID(segmentID) if err != nil { - log.Debug(err.Error()) - continue + // error occurs when segment cannot be found, should not happen + err = fmt.Errorf("insertNode getSegmentByID failed, err = %s", err) + log.Error(err.Error()) + panic(err) } offset := segment.segmentPreDelete(len(pks)) delData.deleteOffset[segmentID] = offset @@ -213,8 +244,17 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { // 3. do delete for segmentID := range delData.deleteOffset { + segmentID := segmentID wg.Add(1) - go iNode.delete(delData, segmentID, &wg) + go func() { + err := iNode.delete(delData, segmentID, &wg) + if err != nil { + // error occurs when segment cannot be found, calling cgo function delete failed and etc... + err = fmt.Errorf("segment delete failed, segmentID = %d, err = %s", segmentID, err) + log.Error(err.Error()) + panic(err) + } + }() } wg.Wait() @@ -229,7 +269,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { } // processDeleteMessages would execute delete operations for growing segments -func processDeleteMessages(replica ReplicaInterface, msg *msgstream.DeleteMsg, delData *deleteData) { +func processDeleteMessages(replica ReplicaInterface, msg *msgstream.DeleteMsg, delData *deleteData) error { var partitionIDs []UniqueID var err error if msg.PartitionID != -1 { @@ -237,16 +277,14 @@ func processDeleteMessages(replica ReplicaInterface, msg *msgstream.DeleteMsg, d } else { partitionIDs, err = replica.getPartitionIDs(msg.CollectionID) if err != nil { - log.Warn(err.Error()) - return + return err } } resultSegmentIDs := make([]UniqueID, 0) for _, partitionID := range partitionIDs { segmentIDs, err := replica.getSegmentIDs(partitionID) if err != nil { - log.Warn(err.Error()) - continue + return err } resultSegmentIDs = append(resultSegmentIDs, segmentIDs...) } @@ -255,26 +293,22 @@ func processDeleteMessages(replica ReplicaInterface, msg *msgstream.DeleteMsg, d for _, segmentID := range resultSegmentIDs { segment, err := replica.getSegmentByID(segmentID) if err != nil { - log.Warn(err.Error()) - continue + return err } pks, tss, err := filterSegmentsByPKs(primaryKeys, msg.Timestamps, segment) if err != nil { - log.Warn(err.Error()) - continue + return err } if len(pks) > 0 { delData.deleteIDs[segmentID] = append(delData.deleteIDs[segmentID], pks...) delData.deleteTimestamps[segmentID] = append(delData.deleteTimestamps[segmentID], tss...) } } + return nil } // filterSegmentsByPKs would filter segments by primary keys func filterSegmentsByPKs(pks []primaryKey, timestamps []Timestamp, segment *Segment) ([]primaryKey, []Timestamp, error) { - if pks == nil { - return nil, nil, fmt.Errorf("pks is nil when getSegmentsByPKs") - } if segment == nil { return nil, nil, fmt.Errorf("segments is nil when getSegmentsByPKs") } @@ -304,18 +338,12 @@ func filterSegmentsByPKs(pks []primaryKey, timestamps []Timestamp, segment *Segm } // insert would execute insert operations for specific growing segment -func (iNode *insertNode) insert(iData *insertData, segmentID UniqueID, wg *sync.WaitGroup) { +func (iNode *insertNode) insert(iData *insertData, segmentID UniqueID, wg *sync.WaitGroup) error { + defer wg.Done() + var targetSegment, err = iNode.streamingReplica.getSegmentByID(segmentID) if err != nil { - log.Warn("cannot find segment:", zap.Int64("segmentID", segmentID)) - // TODO: add error handling - wg.Done() - return - } - - if targetSegment.segmentType != segmentTypeGrowing { - wg.Done() - return + return fmt.Errorf("getSegmentByID failed, err = %s", err) } ids := iData.insertIDs[segmentID] @@ -328,27 +356,23 @@ func (iNode *insertNode) insert(iData *insertData, segmentID UniqueID, wg *sync. err = targetSegment.segmentInsert(offsets, ids, timestamps, insertRecord) if err != nil { - log.Debug("QueryNode: targetSegmentInsert failed", zap.Error(err)) - // TODO: add error handling - wg.Done() - return + return fmt.Errorf("segmentInsert failed, segmentID = %d, err = %s", segmentID, err) } log.Debug("Do insert done", zap.Int("len", len(iData.insertIDs[segmentID])), zap.Int64("collectionID", targetSegment.collectionID), zap.Int64("segmentID", segmentID)) - wg.Done() + return nil } // delete would execute delete operations for specific growing segment -func (iNode *insertNode) delete(deleteData *deleteData, segmentID UniqueID, wg *sync.WaitGroup) { +func (iNode *insertNode) delete(deleteData *deleteData, segmentID UniqueID, wg *sync.WaitGroup) error { defer wg.Done() targetSegment, err := iNode.streamingReplica.getSegmentByID(segmentID) if err != nil { - log.Warn(err.Error()) - return + return fmt.Errorf("getSegmentByID failed, err = %s", err) } if targetSegment.segmentType != segmentTypeGrowing { - return + return fmt.Errorf("unexpected segmentType when delete, segmentType = %s", targetSegment.segmentType.String()) } ids := deleteData.deleteIDs[segmentID] @@ -357,11 +381,11 @@ func (iNode *insertNode) delete(deleteData *deleteData, segmentID UniqueID, wg * err = targetSegment.segmentDelete(offset, ids, timestamps) if err != nil { - log.Warn("QueryNode: targetSegmentDelete failed", zap.Error(err)) - return + return fmt.Errorf("segmentDelete failed, err = %s", err) } log.Debug("Do delete done", zap.Int("len", len(deleteData.deleteIDs[segmentID])), zap.Int64("segmentID", segmentID)) + return nil } // TODO: remove this function to proper file @@ -415,8 +439,7 @@ func getPKsFromRowBasedInsertMsg(msg *msgstream.InsertMsg, schema *schemapb.Coll if t.Key == "dim" { dim, err := strconv.Atoi(t.Value) if err != nil { - log.Warn("strconv wrong on get dim", zap.Error(err)) - break + return nil, fmt.Errorf("strconv wrong on get dim, err = %s", err) } offset += dim * 4 break @@ -427,8 +450,7 @@ func getPKsFromRowBasedInsertMsg(msg *msgstream.InsertMsg, schema *schemapb.Coll if t.Key == "dim" { dim, err := strconv.Atoi(t.Value) if err != nil { - log.Warn("strconv wrong on get dim", zap.Error(err)) - return nil, err + return nil, fmt.Errorf("strconv wrong on get dim, err = %s", err) } offset += dim / 8 break diff --git a/internal/querynode/flow_graph_insert_node_test.go b/internal/querynode/flow_graph_insert_node_test.go index cef071fdaf..417814d118 100644 --- a/internal/querynode/flow_graph_insert_node_test.go +++ b/internal/querynode/flow_graph_insert_node_test.go @@ -21,6 +21,8 @@ import ( "sync" "testing" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/bits-and-blooms/bloom/v3" "github.com/stretchr/testify/assert" @@ -31,6 +33,24 @@ import ( "github.com/milvus-io/milvus/internal/util/flowgraph" ) +func getInsertNode() (*insertNode, error) { + streaming, err := genSimpleReplica() + if err != nil { + return nil, err + } + + err = streaming.addSegment(defaultSegmentID, + defaultPartitionID, + defaultCollectionID, + defaultDMLChannel, + segmentTypeGrowing) + if err != nil { + return nil, err + } + + return newInsertNode(streaming), nil +} + func genFlowGraphInsertData(schema *schemapb.CollectionSchema, numRows int) (*insertData, error) { insertMsg, err := genSimpleInsertMsg(schema, numRows) if err != nil { @@ -76,49 +96,34 @@ func genFlowGraphDeleteData() (*deleteData, error) { } func TestFlowGraphInsertNode_insert(t *testing.T) { + pkType := schemapb.DataType_Int64 + schema := genTestCollectionSchema(pkType) + t.Run("test insert", func(t *testing.T) { - streaming, err := genSimpleReplica() - assert.NoError(t, err) - insertNode := newInsertNode(streaming) - - err = streaming.addSegment(defaultSegmentID, - defaultPartitionID, - defaultCollectionID, - defaultDMLChannel, - segmentTypeGrowing) + insertNode, err := getInsertNode() assert.NoError(t, err) - collection, err := streaming.getCollectionByID(defaultCollectionID) - assert.NoError(t, err) - insertData, err := genFlowGraphInsertData(collection.schema, defaultMsgLength) + insertData, err := genFlowGraphInsertData(schema, defaultMsgLength) assert.NoError(t, err) wg := &sync.WaitGroup{} wg.Add(1) - insertNode.insert(insertData, defaultSegmentID, wg) + err = insertNode.insert(insertData, defaultSegmentID, wg) + assert.NoError(t, err) }) t.Run("test segment insert error", func(t *testing.T) { - streaming, err := genSimpleReplica() - assert.NoError(t, err) - insertNode := newInsertNode(streaming) - - err = streaming.addSegment(defaultSegmentID, - defaultPartitionID, - defaultCollectionID, - defaultDMLChannel, - segmentTypeGrowing) + insertNode, err := getInsertNode() assert.NoError(t, err) - collection, err := streaming.getCollectionByID(defaultCollectionID) - assert.NoError(t, err) - insertData, err := genFlowGraphInsertData(collection.schema, defaultMsgLength) + insertData, err := genFlowGraphInsertData(schema, defaultMsgLength) assert.NoError(t, err) wg := &sync.WaitGroup{} wg.Add(1) insertData.insertRecords[defaultSegmentID] = insertData.insertRecords[defaultSegmentID][:len(insertData.insertRecords[defaultSegmentID])/2] - insertNode.insert(insertData, defaultSegmentID, wg) + err = insertNode.insert(insertData, defaultSegmentID, wg) + assert.Error(t, err) }) t.Run("test no target segment", func(t *testing.T) { @@ -127,84 +132,65 @@ func TestFlowGraphInsertNode_insert(t *testing.T) { insertNode := newInsertNode(streaming) wg := &sync.WaitGroup{} wg.Add(1) - insertNode.insert(nil, defaultSegmentID, wg) + err = insertNode.insert(nil, defaultSegmentID, wg) + assert.Error(t, err) }) t.Run("test invalid segmentType", func(t *testing.T) { - streaming, err := genSimpleReplica() + insertNode, err := getInsertNode() assert.NoError(t, err) - insertNode := newInsertNode(streaming) - err = streaming.addSegment(defaultSegmentID, - defaultPartitionID, - defaultCollectionID, - defaultDMLChannel, - segmentTypeSealed) + insertData, err := genFlowGraphInsertData(schema, defaultMsgLength) assert.NoError(t, err) + seg, err := insertNode.streamingReplica.getSegmentByID(defaultSegmentID) + assert.NoError(t, err) + seg.setType(segmentTypeSealed) + wg := &sync.WaitGroup{} wg.Add(1) - insertNode.insert(nil, defaultSegmentID, wg) + err = insertNode.insert(insertData, defaultSegmentID, wg) + assert.Error(t, err) }) } func TestFlowGraphInsertNode_delete(t *testing.T) { + pkType := schemapb.DataType_Int64 + schema := genTestCollectionSchema(pkType) + t.Run("test insert and delete", func(t *testing.T) { - streaming, err := genSimpleReplica() - assert.NoError(t, err) - insertNode := newInsertNode(streaming) - - err = streaming.addSegment(defaultSegmentID, - defaultPartitionID, - defaultCollectionID, - defaultDMLChannel, - segmentTypeGrowing) + insertNode, err := getInsertNode() assert.NoError(t, err) - collection, err := streaming.getCollectionByID(defaultCollectionID) - assert.NoError(t, err) - insertData, err := genFlowGraphInsertData(collection.schema, defaultMsgLength) + insertData, err := genFlowGraphInsertData(schema, defaultMsgLength) assert.NoError(t, err) wg := &sync.WaitGroup{} wg.Add(1) - insertNode.insert(insertData, defaultSegmentID, wg) + err = insertNode.insert(insertData, defaultSegmentID, wg) + assert.NoError(t, err) deleteData, err := genFlowGraphDeleteData() assert.NoError(t, err) wg.Add(1) - insertNode.delete(deleteData, defaultSegmentID, wg) + err = insertNode.delete(deleteData, defaultSegmentID, wg) + assert.NoError(t, err) }) t.Run("test only delete", func(t *testing.T) { - streaming, err := genSimpleReplica() - assert.NoError(t, err) - insertNode := newInsertNode(streaming) - - err = streaming.addSegment(defaultSegmentID, - defaultPartitionID, - defaultCollectionID, - defaultDMLChannel, - segmentTypeGrowing) + insertNode, err := getInsertNode() assert.NoError(t, err) deleteData, err := genFlowGraphDeleteData() assert.NoError(t, err) wg := &sync.WaitGroup{} wg.Add(1) - insertNode.delete(deleteData, defaultSegmentID, wg) + err = insertNode.delete(deleteData, defaultSegmentID, wg) + assert.NoError(t, err) }) t.Run("test segment delete error", func(t *testing.T) { - streaming, err := genSimpleReplica() - assert.NoError(t, err) - insertNode := newInsertNode(streaming) - - err = streaming.addSegment(defaultSegmentID, - defaultPartitionID, - defaultCollectionID, - defaultDMLChannel, - segmentTypeGrowing) + insertNode, err := getInsertNode() assert.NoError(t, err) deleteData, err := genFlowGraphDeleteData() @@ -212,7 +198,8 @@ func TestFlowGraphInsertNode_delete(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) deleteData.deleteTimestamps[defaultSegmentID] = deleteData.deleteTimestamps[defaultSegmentID][:len(deleteData.deleteTimestamps)/2] - insertNode.delete(deleteData, defaultSegmentID, wg) + err = insertNode.delete(deleteData, defaultSegmentID, wg) + assert.Error(t, err) }) t.Run("test no target segment", func(t *testing.T) { @@ -221,27 +208,49 @@ func TestFlowGraphInsertNode_delete(t *testing.T) { insertNode := newInsertNode(streaming) wg := &sync.WaitGroup{} wg.Add(1) - insertNode.delete(nil, defaultSegmentID, wg) + err = insertNode.delete(nil, defaultSegmentID, wg) + assert.Error(t, err) + }) +} + +func TestFlowGraphInsertNode_processDeleteMessages(t *testing.T) { + t.Run("test processDeleteMessages", func(t *testing.T) { + streaming, err := genSimpleReplica() + assert.NoError(t, err) + + dMsg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) + dData, err := genFlowGraphDeleteData() + assert.NoError(t, err) + + err = processDeleteMessages(streaming, dMsg, dData) + assert.NoError(t, err) + }) + + t.Run("test processDeleteMessages", func(t *testing.T) { + streaming, err := genSimpleReplica() + assert.NoError(t, err) + + dMsg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) + dData, err := genFlowGraphDeleteData() + assert.NoError(t, err) + + err = processDeleteMessages(streaming, dMsg, dData) + assert.NoError(t, err) }) } func TestFlowGraphInsertNode_operate(t *testing.T) { - t.Run("test operate", func(t *testing.T) { - streaming, err := genSimpleReplica() - assert.NoError(t, err) - insertNode := newInsertNode(streaming) + pkType := schemapb.DataType_Int64 + schema := genTestCollectionSchema(pkType) - err = streaming.addSegment(defaultSegmentID, - defaultPartitionID, - defaultCollectionID, - defaultDMLChannel, - segmentTypeGrowing) + genMsgStreamInsertMsg := func() *msgstream.InsertMsg { + iMsg, err := genSimpleInsertMsg(schema, defaultMsgLength) assert.NoError(t, err) + return iMsg + } - collection, err := streaming.getCollectionByID(defaultCollectionID) - assert.NoError(t, err) - msgInsertMsg, err := genSimpleInsertMsg(collection.schema, defaultMsgLength) - assert.NoError(t, err) + genInsertMsg := func() *insertMsg { + msgInsertMsg := genMsgStreamInsertMsg() msgDeleteMsg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) iMsg := insertMsg{ insertMessages: []*msgstream.InsertMsg{ @@ -251,28 +260,26 @@ func TestFlowGraphInsertNode_operate(t *testing.T) { msgDeleteMsg, }, } - msg := []flowgraph.Msg{&iMsg} + return &iMsg + } + + t.Run("test operate", func(t *testing.T) { + insertNode, err := getInsertNode() + assert.NoError(t, err) + + msg := []flowgraph.Msg{genInsertMsg()} insertNode.Operate(msg) - s, err := streaming.getSegmentByID(defaultSegmentID) + s, err := insertNode.streamingReplica.getSegmentByID(defaultSegmentID) assert.Nil(t, err) buf := make([]byte, 8) for i := 0; i < defaultMsgLength; i++ { common.Endian.PutUint64(buf, uint64(i)) assert.True(t, s.pkFilter.Test(buf)) } - }) t.Run("test invalid partitionID", func(t *testing.T) { - streaming, err := genSimpleReplica() - assert.NoError(t, err) - insertNode := newInsertNode(streaming) - - err = streaming.addSegment(defaultSegmentID, - defaultPartitionID, - defaultCollectionID, - defaultDMLChannel, - segmentTypeGrowing) + insertNode, err := getInsertNode() assert.NoError(t, err) msgDeleteMsg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) @@ -288,15 +295,7 @@ func TestFlowGraphInsertNode_operate(t *testing.T) { }) t.Run("test collection partition not exist", func(t *testing.T) { - streaming, err := genSimpleReplica() - assert.NoError(t, err) - insertNode := newInsertNode(streaming) - - err = streaming.addSegment(defaultSegmentID, - defaultPartitionID, - defaultCollectionID, - defaultDMLChannel, - segmentTypeGrowing) + insertNode, err := getInsertNode() assert.NoError(t, err) msgDeleteMsg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) @@ -309,19 +308,13 @@ func TestFlowGraphInsertNode_operate(t *testing.T) { }, } msg := []flowgraph.Msg{&iMsg} - insertNode.Operate(msg) + assert.Panics(t, func() { + insertNode.Operate(msg) + }) }) t.Run("test partition not exist", func(t *testing.T) { - streaming, err := genSimpleReplica() - assert.NoError(t, err) - insertNode := newInsertNode(streaming) - - err = streaming.addSegment(defaultSegmentID, - defaultPartitionID, - defaultCollectionID, - defaultDMLChannel, - segmentTypeGrowing) + insertNode, err := getInsertNode() assert.NoError(t, err) msgDeleteMsg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) @@ -333,36 +326,63 @@ func TestFlowGraphInsertNode_operate(t *testing.T) { }, } msg := []flowgraph.Msg{&iMsg} - insertNode.Operate(msg) + assert.Panics(t, func() { + insertNode.Operate(msg) + }) }) t.Run("test invalid input length", func(t *testing.T) { + insertNode, err := getInsertNode() + assert.NoError(t, err) + msg := []flowgraph.Msg{genInsertMsg(), genInsertMsg()} + insertNode.Operate(msg) + }) + + t.Run("test getCollectionByID failed", func(t *testing.T) { streaming, err := genSimpleReplica() assert.NoError(t, err) insertNode := newInsertNode(streaming) - err = streaming.addSegment(defaultSegmentID, - defaultPartitionID, - defaultCollectionID, - defaultDMLChannel, - segmentTypeGrowing) + msg := []flowgraph.Msg{genInsertMsg()} + + err = insertNode.streamingReplica.removeCollection(defaultCollectionID) + assert.NoError(t, err) + assert.Panics(t, func() { + insertNode.Operate(msg) + }) + }) + + t.Run("test TransferInsertMsgToInsertRecord failed", func(t *testing.T) { + insertNode, err := getInsertNode() assert.NoError(t, err) - collection, err := streaming.getCollectionByID(defaultCollectionID) + col, err := insertNode.streamingReplica.getCollectionByID(defaultCollectionID) assert.NoError(t, err) - msgInsertMsg, err := genSimpleInsertMsg(collection.schema, defaultMsgLength) - assert.NoError(t, err) - msgDeleteMsg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) - iMsg := insertMsg{ - insertMessages: []*msgstream.InsertMsg{ - msgInsertMsg, - }, - deleteMessages: []*msgstream.DeleteMsg{ - msgDeleteMsg, - }, + + for i, field := range col.schema.GetFields() { + if field.DataType == schemapb.DataType_FloatVector { + col.schema.Fields[i].TypeParams = nil + } } - msg := []flowgraph.Msg{&iMsg, &iMsg} - insertNode.Operate(msg) + + iMsg := genInsertMsg() + iMsg.insertMessages[0].Version = internalpb.InsertDataVersion_RowBased + msg := []flowgraph.Msg{iMsg} + assert.Panics(t, func() { + insertNode.Operate(msg) + }) + }) + + t.Run("test getPrimaryKeys failed", func(t *testing.T) { + insertNode, err := getInsertNode() + assert.NoError(t, err) + + iMsg := genInsertMsg() + iMsg.insertMessages[0].NumRows = 0 + msg := []flowgraph.Msg{iMsg} + assert.Panics(t, func() { + insertNode.Operate(msg) + }) }) } @@ -394,7 +414,7 @@ func TestFilterSegmentsByPKs(t *testing.T) { assert.Nil(t, err) assert.Equal(t, len(pks), 0) _, _, err = filterSegmentsByPKs(nil, timestamps, segment) - assert.NotNil(t, err) + assert.NoError(t, err) _, _, err = filterSegmentsByPKs([]primaryKey{pk0, pk1, pk2, pk3, pk4}, timestamps, nil) assert.NotNil(t, err) }) @@ -424,7 +444,7 @@ func TestFilterSegmentsByPKs(t *testing.T) { assert.Nil(t, err) assert.Equal(t, len(pks), 0) _, _, err = filterSegmentsByPKs(nil, timestamps, segment) - assert.NotNil(t, err) + assert.NoError(t, err) _, _, err = filterSegmentsByPKs([]primaryKey{pk0, pk1, pk2, pk3, pk4}, timestamps, nil) assert.NotNil(t, err) }) diff --git a/internal/querynode/flow_graph_service_time_node.go b/internal/querynode/flow_graph_service_time_node.go index 964f11466c..e5a6479e93 100644 --- a/internal/querynode/flow_graph_service_time_node.go +++ b/internal/querynode/flow_graph_service_time_node.go @@ -20,10 +20,11 @@ import ( "fmt" "reflect" + "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/flowgraph" "github.com/milvus-io/milvus/internal/util/tsoutil" - "go.uber.org/zap" ) // serviceTimeNode is one of the nodes in delta flow graph @@ -63,10 +64,8 @@ func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { // update service time err := stNode.tSafeReplica.setTSafe(stNode.vChannel, serviceTimeMsg.timeRange.timestampMax) if err != nil { - log.Error("serviceTimeNode setTSafe failed", - zap.Any("collectionID", stNode.collectionID), - zap.Error(err), - ) + // should not happen, QueryNode should addTSafe before start flow graph + panic(fmt.Errorf("serviceTimeNode setTSafe timeout, collectionID = %d, err = %s", stNode.collectionID, err)) } p, _ := tsoutil.ParseTS(serviceTimeMsg.timeRange.timestampMax) log.RatedDebug(10.0, "update tSafe:", diff --git a/internal/querynode/flow_graph_service_time_node_test.go b/internal/querynode/flow_graph_service_time_node_test.go index 693107da9f..981048f02e 100644 --- a/internal/querynode/flow_graph_service_time_node_test.go +++ b/internal/querynode/flow_graph_service_time_node_test.go @@ -19,6 +19,8 @@ package querynode import ( "testing" + "github.com/stretchr/testify/assert" + "github.com/milvus-io/milvus/internal/util/flowgraph" ) @@ -78,7 +80,9 @@ func TestServiceTimeNode_Operate(t *testing.T) { timestampMax: 1000, }, } - in := []flowgraph.Msg{msg, msg} - node.Operate(in) + in := []flowgraph.Msg{msg} + assert.Panics(t, func() { + node.Operate(in) + }) }) } diff --git a/internal/querynode/meta_replica.go b/internal/querynode/meta_replica.go index d8e8c7261b..f98437709b 100644 --- a/internal/querynode/meta_replica.go +++ b/internal/querynode/meta_replica.go @@ -262,7 +262,9 @@ func (replica *metaReplica) getPartitionIDs(collectionID UniqueID) ([]UniqueID, return nil, err } - return collection.partitionIDs, nil + parID := make([]UniqueID, len(collection.partitionIDs)) + copy(parID, collection.partitionIDs) + return parID, nil } func (replica *metaReplica) getIndexedFieldIDByCollectionIDPrivate(collectionID UniqueID, segment *Segment) ([]FieldID, error) { @@ -500,7 +502,10 @@ func (replica *metaReplica) getSegmentIDsPrivate(partitionID UniqueID) ([]Unique if err2 != nil { return nil, err2 } - return partition.segmentIDs, nil + + segIDs := make([]UniqueID, len(partition.segmentIDs)) + copy(segIDs, partition.segmentIDs) + return segIDs, nil } //----------------------------------------------------------------------------------------------------- segment diff --git a/internal/querynode/mock_test.go b/internal/querynode/mock_test.go index 91b46b8a60..3273fc62f2 100644 --- a/internal/querynode/mock_test.go +++ b/internal/querynode/mock_test.go @@ -973,7 +973,7 @@ func genSimpleInsertMsg(schema *schemapb.CollectionSchema, numRows int) (*msgstr return &msgstream.InsertMsg{ BaseMsg: genMsgStreamBaseMsg(), InsertRequest: internalpb.InsertRequest{ - Base: genCommonMsgBase(commonpb.MsgType_Retrieve), + Base: genCommonMsgBase(commonpb.MsgType_Insert), CollectionName: defaultCollectionName, PartitionName: defaultPartitionName, CollectionID: defaultCollectionID, diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index a639ac0e9c..718e862f73 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -596,7 +596,7 @@ func (s *Segment) segmentInsert(offset int64, entityIDs []UniqueID, timestamps [ s.segPtrMu.RLock() defer s.segPtrMu.RUnlock() // thread safe guaranteed by segCore, use RLock if s.segmentType != segmentTypeGrowing { - return nil + return fmt.Errorf("unexpected segmentType when segmentInsert, segmentType = %s", s.segmentType.String()) } if s.segmentPtr == nil { diff --git a/internal/querynode/segment_test.go b/internal/querynode/segment_test.go index a65dbbb4d1..1381d28d91 100644 --- a/internal/querynode/segment_test.go +++ b/internal/querynode/segment_test.go @@ -343,7 +343,7 @@ func TestSegment_segmentInsert(t *testing.T) { segment, err := genSimpleSealedSegment(defaultMsgLength) assert.NoError(t, err) err = segment.segmentInsert(0, nil, nil, nil) - assert.NoError(t, err) + assert.Error(t, err) }) } diff --git a/internal/querynode/task_test.go b/internal/querynode/task_test.go index 4bb4e31788..a894e939bd 100644 --- a/internal/querynode/task_test.go +++ b/internal/querynode/task_test.go @@ -827,14 +827,17 @@ func TestTask_releasePartitionTask(t *testing.T) { node, err := genSimpleQueryNode(ctx) assert.NoError(t, err) - col, err := node.historical.getCollectionByID(defaultCollectionID) + hisCol, err := node.historical.getCollectionByID(defaultCollectionID) + assert.NoError(t, err) + strCol, err := node.streaming.getCollectionByID(defaultCollectionID) assert.NoError(t, err) err = node.historical.removePartition(defaultPartitionID) assert.NoError(t, err) - col.addVDeltaChannels([]Channel{defaultDeltaChannel}) - col.setLoadType(loadTypePartition) + hisCol.addVDeltaChannels([]Channel{defaultDeltaChannel}) + hisCol.setLoadType(loadTypePartition) + strCol.setLoadType(loadTypePartition) /* err = node.queryService.addQueryCollection(defaultCollectionID)