From 55f148f17e1202ba07ea2d562906818eed669063 Mon Sep 17 00:00:00 2001 From: yukun Date: Fri, 15 Oct 2021 16:54:43 +0800 Subject: [PATCH] Add deleteMessage in flow_graph_filter_dm_node (#9504) Signed-off-by: fishpenguin --- .../querynode/flow_graph_filter_dm_node.go | 67 ++++++++++++++ .../flow_graph_filter_dm_node_test.go | 89 +++++++++++++++++++ internal/querynode/flow_graph_message.go | 1 + 3 files changed, 157 insertions(+) diff --git a/internal/querynode/flow_graph_filter_dm_node.go b/internal/querynode/flow_graph_filter_dm_node.go index 37986b2ae0..af1cbc5494 100644 --- a/internal/querynode/flow_graph_filter_dm_node.go +++ b/internal/querynode/flow_graph_filter_dm_node.go @@ -63,11 +63,13 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { var iMsg = insertMsg{ insertMessages: make([]*msgstream.InsertMsg, 0), + deleteMessages: make([]*msgstream.DeleteMsg, 0), timeRange: TimeRange{ timestampMin: msgStreamMsg.TimestampMin(), timestampMax: msgStreamMsg.TimestampMax(), }, } + for _, msg := range msgStreamMsg.TsMessages() { switch msg.Type() { case commonpb.MsgType_Insert: @@ -75,6 +77,11 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { if resMsg != nil { iMsg.insertMessages = append(iMsg.insertMessages, resMsg) } + case commonpb.MsgType_Delete: + resMsg := fdmNode.filterInvalidDeleteMessage(msg.(*msgstream.DeleteMsg)) + if resMsg != nil { + iMsg.deleteMessages = append(iMsg.deleteMessages, resMsg) + } default: log.Warn("Non supporting", zap.Int32("message type", int32(msg.Type()))) } @@ -87,6 +94,66 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { return []Msg{res} } +func (fdmNode *filterDmNode) filterInvalidDeleteMessage(msg *msgstream.DeleteMsg) *msgstream.DeleteMsg { + sp, ctx := trace.StartSpanFromContext(msg.TraceCtx()) + msg.SetTraceCtx(ctx) + defer sp.Finish() + + // check if collection and partition exist + collection := fdmNode.replica.hasCollection(msg.CollectionID) + partition := fdmNode.replica.hasPartition(msg.PartitionID) + if fdmNode.loadType == loadTypeCollection && !collection { + log.Debug("filter invalid delete message, collection dose not exist", + zap.Any("collectionID", msg.CollectionID), + zap.Any("partitionID", msg.PartitionID)) + return nil + } + if fdmNode.loadType == loadTypePartition && !partition { + log.Debug("filter invalid delete message, partition dose not exist", + zap.Any("collectionID", msg.CollectionID), + zap.Any("partitionID", msg.PartitionID)) + return nil + } + + if msg.CollectionID != fdmNode.collectionID { + return nil + } + + // if the flow graph type is partition, check if the partition is target partition + if fdmNode.loadType == loadTypePartition && msg.PartitionID != fdmNode.partitionID { + log.Debug("filter invalid delete message, partition is not the target partition", + zap.Any("collectionID", msg.CollectionID), + zap.Any("partitionID", msg.PartitionID)) + return nil + } + + // check if partition has been released + if fdmNode.loadType == loadTypeCollection { + col, err := fdmNode.replica.getCollectionByID(msg.CollectionID) + if err != nil { + log.Warn(err.Error()) + return nil + } + if err = col.checkReleasedPartitions([]UniqueID{msg.PartitionID}); err != nil { + log.Warn(err.Error()) + return nil + } + } + + if len(msg.PrimaryKeys) != len(msg.Timestamps) { + log.Warn("Error, misaligned messages detected") + return nil + } + + if len(msg.Timestamps) <= 0 { + log.Debug("filter invalid delete message, no message", + zap.Any("collectionID", msg.CollectionID), + zap.Any("partitionID", msg.PartitionID)) + return nil + } + return msg +} + func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg) *msgstream.InsertMsg { sp, ctx := trace.StartSpanFromContext(msg.TraceCtx()) msg.SetTraceCtx(ctx) diff --git a/internal/querynode/flow_graph_filter_dm_node_test.go b/internal/querynode/flow_graph_filter_dm_node_test.go index bc3a067f61..b79cba3136 100644 --- a/internal/querynode/flow_graph_filter_dm_node_test.go +++ b/internal/querynode/flow_graph_filter_dm_node_test.go @@ -168,6 +168,95 @@ func TestFlowGraphFilterDmNode_filterInvalidInsertMessage(t *testing.T) { }) } +func TestFlowGraphFilterDmNode_filterInvalidDeleteMessage(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + t.Run("delete valid test", func(t *testing.T) { + msg, err := genSimpleDeleteMsg() + assert.NoError(t, err) + fg, err := getFilterDMNode(ctx) + assert.NoError(t, err) + res := fg.filterInvalidDeleteMessage(msg) + assert.NotNil(t, res) + }) + + t.Run("test delete no collection", func(t *testing.T) { + msg, err := genSimpleDeleteMsg() + assert.NoError(t, err) + msg.CollectionID = UniqueID(1000) + fg, err := getFilterDMNode(ctx) + assert.NoError(t, err) + res := fg.filterInvalidDeleteMessage(msg) + assert.Nil(t, res) + }) + + t.Run("test delete no partition", func(t *testing.T) { + msg, err := genSimpleDeleteMsg() + assert.NoError(t, err) + msg.PartitionID = UniqueID(1000) + fg, err := getFilterDMNode(ctx) + assert.NoError(t, err) + fg.loadType = loadTypePartition + res := fg.filterInvalidDeleteMessage(msg) + assert.Nil(t, res) + }) + + t.Run("test delete not target collection", func(t *testing.T) { + msg, err := genSimpleDeleteMsg() + assert.NoError(t, err) + fg, err := getFilterDMNode(ctx) + assert.NoError(t, err) + fg.collectionID = UniqueID(1000) + res := fg.filterInvalidDeleteMessage(msg) + assert.Nil(t, res) + }) + + t.Run("test delete not target partition", func(t *testing.T) { + msg, err := genSimpleDeleteMsg() + assert.NoError(t, err) + fg, err := getFilterDMNode(ctx) + assert.NoError(t, err) + fg.loadType = loadTypePartition + fg.partitionID = UniqueID(1000) + res := fg.filterInvalidDeleteMessage(msg) + assert.Nil(t, res) + }) + + t.Run("test delete released partition", func(t *testing.T) { + msg, err := genSimpleDeleteMsg() + assert.NoError(t, err) + fg, err := getFilterDMNode(ctx) + assert.NoError(t, err) + col, err := fg.replica.getCollectionByID(defaultCollectionID) + assert.NoError(t, err) + col.addReleasedPartition(defaultPartitionID) + res := fg.filterInvalidDeleteMessage(msg) + assert.Nil(t, res) + }) + + t.Run("test delete misaligned messages", func(t *testing.T) { + msg, err := genSimpleDeleteMsg() + assert.NoError(t, err) + fg, err := getFilterDMNode(ctx) + assert.NoError(t, err) + msg.Timestamps = make([]Timestamp, 0) + res := fg.filterInvalidDeleteMessage(msg) + assert.Nil(t, res) + }) + + t.Run("test delete no data", func(t *testing.T) { + msg, err := genSimpleDeleteMsg() + assert.NoError(t, err) + fg, err := getFilterDMNode(ctx) + assert.NoError(t, err) + msg.Timestamps = make([]Timestamp, 0) + msg.PrimaryKeys = make([]IntPrimaryKey, 0) + res := fg.filterInvalidDeleteMessage(msg) + assert.Nil(t, res) + }) +} + func TestFlowGraphFilterDmNode_Operate(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/internal/querynode/flow_graph_message.go b/internal/querynode/flow_graph_message.go index e56fa87a7a..2f6f7550a3 100644 --- a/internal/querynode/flow_graph_message.go +++ b/internal/querynode/flow_graph_message.go @@ -24,6 +24,7 @@ type MsgStreamMsg = flowgraph.MsgStreamMsg type insertMsg struct { insertMessages []*msgstream.InsertMsg + deleteMessages []*msgstream.DeleteMsg timeRange TimeRange }